@@ -20,6 +20,7 @@ package org.apache.spark.ui.jobs
2020import java .util .Date
2121import javax .servlet .http .HttpServletRequest
2222
23+ import scala .collection .mutable .HashSet
2324import scala .xml .{Elem , Node , Unparsed }
2425
2526import org .apache .commons .lang3 .StringEscapeUtils
@@ -208,10 +209,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
208209
209210 val unzipped = taskHeadersAndCssClasses.unzip
210211
212+ val currentTime = System .currentTimeMillis()
211213 val taskTable = UIUtils .listingTable(
212214 unzipped._1,
213215 taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput,
214- stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled),
216+ stageData.hasShuffleRead, stageData.hasShuffleWrite,
217+ stageData.hasBytesSpilled, currentTime),
215218 tasks,
216219 headerClasses = unzipped._2)
217220 // Excludes tasks which failed and have incomplete metrics
@@ -431,28 +434,219 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
431434 val maybeAccumulableTable : Seq [Node ] =
432435 if (accumulables.size > 0 ) { <h4 >Accumulators </h4 > ++ accumulableTable } else Seq ()
433436
437+ val executorsSet = new HashSet [(String , String )]
438+
439+ var minLaunchTime = Long .MaxValue
440+ var maxFinishTime = Long .MinValue
441+ var numEffectiveTasks = 0
442+ val executorsArrayStr = stageData.taskData.flatMap {
443+ case (_, taskUIData) =>
444+ val taskInfo = taskUIData.taskInfo
445+
446+ val executorId = taskInfo.executorId
447+ val host = taskInfo.host
448+ executorsSet += ((executorId, host))
449+
450+ val taskId = taskInfo.taskId
451+ val taskIdWithIndexAndAttempt = s " Task ${taskId}( ${taskInfo.id}) "
452+
453+ val isSucceeded = taskInfo.successful
454+ val isFailed = taskInfo.failed
455+ val isRunning = taskInfo.running
456+ val classNameByStatus = {
457+ if (isSucceeded) {
458+ " succeeded"
459+ } else if (isFailed) {
460+ " failed"
461+ } else if (isRunning) {
462+ " running"
463+ }
464+ }
465+
466+ if (isSucceeded || isRunning || isFailed) {
467+ val launchTime = taskInfo.launchTime
468+ val finishTime = if (! isRunning) taskInfo.finishTime else currentTime
469+ val totalExecutionTime = finishTime - launchTime
470+ minLaunchTime = launchTime.min(minLaunchTime)
471+ maxFinishTime = launchTime.max(maxFinishTime)
472+ numEffectiveTasks += 1
473+
474+ val metricsOpt = taskUIData.taskMetrics
475+ val shuffleReadTime =
476+ metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L ).toDouble
477+ val shuffleReadTimeProportion =
478+ (shuffleReadTime / totalExecutionTime * 100 ).toLong
479+ val shuffleWriteTime =
480+ metricsOpt.flatMap(_.shuffleWriteMetrics.map(_.shuffleWriteTime)).getOrElse(0L ) / 1e6
481+ val shuffleWriteTimeProportion =
482+ (shuffleWriteTime / totalExecutionTime * 100 ).toLong
483+ val executorRuntimeProportion =
484+ ((metricsOpt.map(_.executorRunTime).getOrElse(0L ) -
485+ shuffleReadTime - shuffleWriteTime) / totalExecutionTime * 100 ).toLong
486+ val serializationTimeProportion =
487+ (metricsOpt.map(_.resultSerializationTime).getOrElse(0L ).toDouble /
488+ totalExecutionTime * 100 ).toLong
489+ val deserializationTimeProportion =
490+ (metricsOpt.map(_.executorDeserializeTime).getOrElse(0L ).toDouble /
491+ totalExecutionTime * 100 ).toLong
492+ val gettingResultTimeProportion =
493+ (getGettingResultTime(taskUIData.taskInfo).toDouble / totalExecutionTime * 100 ).toLong
494+ val schedulerDelayProportion =
495+ 100 - executorRuntimeProportion - shuffleReadTimeProportion -
496+ shuffleWriteTimeProportion - serializationTimeProportion -
497+ deserializationTimeProportion - gettingResultTimeProportion
498+
499+ val schedulerDelayProportionPos = 0
500+ val deserializationTimeProportionPos =
501+ schedulerDelayProportionPos + schedulerDelayProportion
502+ val shuffleReadTimeProportionPos =
503+ deserializationTimeProportionPos + deserializationTimeProportion
504+ val executorRuntimeProportionPos =
505+ shuffleReadTimeProportionPos + shuffleReadTimeProportion
506+ val shuffleWriteTimeProportionPos =
507+ executorRuntimeProportionPos + executorRuntimeProportion
508+ val serializationTimeProportionPos =
509+ shuffleWriteTimeProportionPos + shuffleWriteTimeProportion
510+ val gettingResultTimeProportionPos =
511+ serializationTimeProportionPos + serializationTimeProportion
512+
513+ val timelineObject =
514+ s """
515+ |{
516+ | 'className': 'task task-assignment-timeline-object ${classNameByStatus}',
517+ | 'group': ' ${executorId}',
518+ | 'content': '<div class="task-assignment-timeline-content">' +
519+ | ' ${taskIdWithIndexAndAttempt}</div>' +
520+ | '<svg class="task-assignment-timeline-duration-bar">' +
521+ | '<rect x=" ${schedulerDelayProportionPos}%" y="0" height="100%"' +
522+ | 'width=" ${schedulerDelayProportion}%" fill="#F6D76B"></rect>' +
523+ | '<rect x=" ${deserializationTimeProportionPos}%" y="0" height="100%"' +
524+ | 'width=" ${deserializationTimeProportion}%" fill="#FFBDD8"></rect>' +
525+ | '<rect x=" ${shuffleReadTimeProportionPos}%" y="0" height="100%"' +
526+ | 'width=" ${shuffleReadTimeProportion}%" fill="#8AC7DE"></rect>' +
527+ | '<rect x=" ${executorRuntimeProportionPos}%" y="0" height="100%"' +
528+ | 'width=" ${executorRuntimeProportion}%" fill="#D9EB52"></rect>' +
529+ | '<rect x=" ${shuffleWriteTimeProportionPos}%" y="0" height="100%"' +
530+ | 'width=" ${shuffleWriteTimeProportion}%" fill="#87796F"></rect>' +
531+ | '<rect x=" ${serializationTimeProportionPos}%" y="0" height="100%"' +
532+ | 'width=" ${serializationTimeProportion}%" fill="#93DFB8"></rect>' +
533+ | '<rect x=" ${gettingResultTimeProportionPos}%" y="0" height="100%"' +
534+ | 'width=" ${gettingResultTimeProportion}%" fill="#FF9036"></rect></svg>',
535+ | 'start': new Date( ${launchTime}),
536+ | 'end': new Date( ${finishTime}),
537+ | 'title': ' ${taskIdWithIndexAndAttempt}\\ nStatus: ${taskInfo.status}\\ n' +
538+ | 'Launch Time: ${UIUtils .formatDate(new Date (launchTime))}' +
539+ | ' ${
540+ if (! isRunning) {
541+ s """ \\ nFinish Time: ${UIUtils .formatDate(new Date (finishTime))}"""
542+ } else {
543+ " "
544+ }
545+ }'
546+ |}
547+ """ .stripMargin
548+ Option (timelineObject)
549+ } else {
550+ None
551+ }
552+ }.mkString(" [" , " ," , " ]" )
553+
554+ val groupArrayStr = executorsSet.map {
555+ case (executorId, host) =>
556+ s """
557+ |{
558+ | 'id': ' ${executorId}',
559+ | 'content': ' ${executorId} / ${host}',
560+ |}
561+ """ .stripMargin
562+ }.mkString(" [" , " ," , " ]" )
563+
564+ var maxWindowInSec = ((maxFinishTime - minLaunchTime) / 1000.0 ).round
565+ if (maxWindowInSec <= 0 ) maxWindowInSec = 1
566+ val tasksPerSecond = numEffectiveTasks / maxWindowInSec
567+ var maxZoom = {
568+ if (tasksPerSecond > 100 ) {
569+ 1000L / (tasksPerSecond / 100 )
570+ }
571+ else {
572+ 24L * 60 * 60 * 1000
573+ }
574+ }
575+
576+ if (maxZoom < 0 ) maxZoom = 1
577+
434578 val content =
435579 summary ++
436580 showAdditionalMetrics ++
437581 <h4 >Summary Metrics for {numCompleted} Completed Tasks </h4 > ++
438582 <div >{summaryTable.getOrElse(" No tasks have reported metrics yet." )}</div > ++
439583 <h4 >Aggregated Metrics by Executor </h4 > ++ executorTable.toNodeSeq ++
440584 maybeAccumulableTable ++
441- <h4 >Tasks </h4 > ++ taskTable
585+ <h4 >Tasks </h4 > ++ taskTable ++
586+ <h4 >Task Assignment Timeline </h4 > ++
587+ <div id =" task-assignment-timeline" >
588+ <div class =" timeline-header" >
589+ {taskAssignmentTimelineControlPanel ++ taskAssignmentTimelineLegend}
590+ </div >
591+ </div > ++
592+ <script type =" text/javascript" >
593+ {Unparsed (s " drawTaskAssignmentTimeline( " +
594+ s " ${groupArrayStr}, ${executorsArrayStr}, ${minLaunchTime}, ${maxZoom}) " )}
595+ </script >
442596
443597 UIUtils .headerSparkPage(" Details for Stage %d" .format(stageId), content, parent)
444598 }
445599 }
446600
601+ private val taskAssignmentTimelineControlPanel : Seq [Node ] = {
602+ <div class =" control-panel" >
603+ <div id =" task-assignment-timeline-zoom-lock" >
604+ <input type =" checkbox" checked =" checked" ></input >
605+ <span >Zoom Lock </span >
606+ </div >
607+ </div >
608+ }
609+
610+ private val taskAssignmentTimelineLegend : Seq [Node ] = {
611+ <div class =" legend-area" >
612+ <svg >
613+ < rect x= " 5px" y= " 5px" width= " 20px"
614+ height= " 15px" rx= " 2px" fill= " #D5DDF6" stroke= " #97B0F8" ></ rect>
615+ <text x =" 35px" y =" 17px" >Succeeded Task </text >
616+ < rect x= " 215px" y= " 5px" width= " 20px"
617+ height= " 15px" rx= " 2px" fill= " #FF5475" stroke= " #97B0F8" ></ rect>
618+ <text x =" 245px" y =" 17px" >Failed Task </text >
619+ < rect x= " 425px" y= " 5px" width= " 20px"
620+ height= " 15px" rx= " 2px" fill= " #FDFFCA" stroke= " #97B0F8" ></ rect>
621+ <text x =" 455px" y =" 17px" >Running Task </text >
622+ {
623+ val legendPairs = List ((" #FFBDD8" , " Task Deserialization Time" ),
624+ (" #8AC7DE" , " Shuffle Read Time" ), (" #D9EB52" , " Executor Computing Time" ),
625+ (" #87796F" , " Shuffle Write Time" ), (" #93DFB8" , " Result Serialization TIme" ),
626+ (" #FF9036" , " Getting Result Time" ), (" #F6D76B" , " Scheduler Delay" ))
627+
628+ legendPairs.zipWithIndex.map {
629+ case ((color, name), index) =>
630+ < rect x= {5 + (index / 3 ) * 210 + " px" } y= {35 + (index % 3 ) * 15 + " px" }
631+ width= " 10px" height= " 10px" fill= {color}></ rect>
632+ < text x= {25 + (index / 3 ) * 210 + " px" }
633+ y= {45 + (index % 3 ) * 15 + " px" }> {name}</text >
634+ }
635+ }
636+ </svg >
637+ </div >
638+ }
639+
447640 def taskRow (
448641 hasAccumulators : Boolean ,
449642 hasInput : Boolean ,
450643 hasOutput : Boolean ,
451644 hasShuffleRead : Boolean ,
452645 hasShuffleWrite : Boolean ,
453- hasBytesSpilled : Boolean )(taskData : TaskUIData ): Seq [Node ] = {
646+ hasBytesSpilled : Boolean ,
647+ currentTime : Long )(taskData : TaskUIData ): Seq [Node ] = {
454648 taskData match { case TaskUIData (info, metrics, errorMessage) =>
455- val duration = if (info.status == " RUNNING" ) info.timeRunning(System .currentTimeMillis() )
649+ val duration = if (info.status == " RUNNING" ) info.timeRunning(currentTime )
456650 else metrics.map(_.executorRunTime).getOrElse(1L )
457651 val formatDuration = if (info.status == " RUNNING" ) UIUtils .formatDuration(duration)
458652 else metrics.map(m => UIUtils .formatDuration(m.executorRunTime)).getOrElse(" " )
0 commit comments