From d11bee16b28ddf6912a7f4a606ecaedb1d0eb170 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 13 May 2015 17:31:20 -0700 Subject: [PATCH 1/3] Don't wait until all stages have started before rendering --- .../ui/scope/RDDOperationGraphListener.scala | 24 ++++++++++--------- .../RDDOperationGraphListenerSuite.scala | 1 - 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala index 3b77a1e12cc45..825d7183b42f0 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala @@ -60,9 +60,22 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen /** On job start, construct a RDDOperationGraph for each stage in the job for display later. */ override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized { val jobId = jobStart.jobId + val stageInfos = jobStart.stageInfos + jobIds += jobId jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted + stageInfos.foreach { stageInfo => + stageIds += stageInfo.stageId + stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo) + // Remove state for old stages + if (stageIds.size >= retainedStages) { + val toRemove = math.max(retainedStages / 10, 1) + stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) } + stageIds.trimStart(toRemove) + } + } + // Remove state for old jobs if (jobIds.size >= retainedJobs) { val toRemove = math.max(retainedJobs / 10, 1) @@ -71,15 +84,4 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen } } - /** Remove graph metadata for old stages */ - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized { - val stageInfo = stageSubmitted.stageInfo - stageIds += stageInfo.stageId - stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo) - if (stageIds.size >= retainedStages) { - val toRemove = math.max(retainedStages / 10, 1) - stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) } - stageIds.trimStart(toRemove) - } - } } diff --git a/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala index 619b38ac02676..c659fc1e8b9a9 100644 --- a/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala @@ -31,7 +31,6 @@ class RDDOperationGraphListenerSuite extends FunSuite { assert(numStages > 0, "I will not run a job with 0 stages for you.") val stageInfos = (0 until numStages).map { _ => val stageInfo = new StageInfo(stageIdCounter, 0, "s", 0, Seq.empty, Seq.empty, "d") - listener.onStageSubmitted(new SparkListenerStageSubmitted(stageInfo)) stageIdCounter += 1 stageInfo } From 02542d63356027e1f9c914f243d0f2151d93150b Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 13 May 2015 17:35:52 -0700 Subject: [PATCH 2/3] Rename overloaded variable --- .../apache/spark/ui/scope/RDDOperationGraphListener.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala index 825d7183b42f0..c5811b8b4b09f 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala @@ -42,10 +42,10 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen /** Return the graph metadata for the given stage, or None if no such information exists. */ def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = { - val stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty } - val graphs = stageIds.flatMap { sid => stageIdToGraph.get(sid) } + val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty } + val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) } // If the metadata for some stages have been removed, do not bother rendering this job - if (stageIds.size != graphs.size) { + if (_stageIds.size != graphs.size) { Seq.empty } else { graphs From 19d4e9869554144e2ff86483603210ea2079c579 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 13 May 2015 17:40:06 -0700 Subject: [PATCH 3/3] Add synchronize --- .../org/apache/spark/ui/scope/RDDOperationGraphListener.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala index c5811b8b4b09f..aa9c25cb5c8c6 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala @@ -41,7 +41,7 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) /** Return the graph metadata for the given stage, or None if no such information exists. */ - def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = { + def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized { val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty } val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) } // If the metadata for some stages have been removed, do not bother rendering this job @@ -53,7 +53,7 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen } /** Return the graph metadata for the given stage, or None if no such information exists. */ - def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = { + def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = synchronized { stageIdToGraph.get(stageId) }