Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,41 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)

/** Return the graph metadata for the given stage, or None if no such information exists. */
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
val stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
val graphs = stageIds.flatMap { sid => stageIdToGraph.get(sid) }
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized {
val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) }
// If the metadata for some stages have been removed, do not bother rendering this job
if (stageIds.size != graphs.size) {
if (_stageIds.size != graphs.size) {
Seq.empty
} else {
graphs
}
}

/** Return the graph metadata for the given stage, or None if no such information exists. */
def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = {
def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = synchronized {
stageIdToGraph.get(stageId)
}

/** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
val jobId = jobStart.jobId
val stageInfos = jobStart.stageInfos

jobIds += jobId
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted

stageInfos.foreach { stageInfo =>
stageIds += stageInfo.stageId
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
// Remove state for old stages
if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
stageIds.trimStart(toRemove)
}
}

// Remove state for old jobs
if (jobIds.size >= retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
Expand All @@ -71,15 +84,4 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
}
}

/** Remove graph metadata for old stages */
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val stageInfo = stageSubmitted.stageInfo
stageIds += stageInfo.stageId
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
stageIds.trimStart(toRemove)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class RDDOperationGraphListenerSuite extends FunSuite {
assert(numStages > 0, "I will not run a job with 0 stages for you.")
val stageInfos = (0 until numStages).map { _ =>
val stageInfo = new StageInfo(stageIdCounter, 0, "s", 0, Seq.empty, Seq.empty, "d")
listener.onStageSubmitted(new SparkListenerStageSubmitted(stageInfo))
stageIdCounter += 1
stageInfo
}
Expand Down