Skip to content

Commit ec34230

Browse files
author
Andrew Or
committed
[SPARK-7608] Clean up old state in RDDOperationGraphListener
This is necessary for streaming and long-running Spark applications. zsxwing tdas Author: Andrew Or <[email protected]> Closes #6125 from andrewor14/viz-listener-leak and squashes the following commits: 8660949 [Andrew Or] Fix thing + add tests 33c0843 [Andrew Or] Clean up old job state (cherry picked from commit f6e1838) Signed-off-by: Andrew Or <[email protected]>
1 parent acd872b commit ec34230

File tree

2 files changed

+108
-9
lines changed

2 files changed

+108
-9
lines changed

core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,16 @@ import org.apache.spark.ui.SparkUI
2727
* A SparkListener that constructs a DAG of RDD operations.
2828
*/
2929
private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener {
30-
private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
31-
private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
32-
private val stageIds = new mutable.ArrayBuffer[Int]
30+
private[ui] val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
31+
private[ui] val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
32+
33+
// Keep track of the order in which these are inserted so we can remove old ones
34+
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
35+
private[ui] val stageIds = new mutable.ArrayBuffer[Int]
3336

3437
// How many jobs or stages to retain graph metadata for
38+
private val retainedJobs =
39+
conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
3540
private val retainedStages =
3641
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
3742

@@ -50,15 +55,22 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
5055
/** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
5156
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
5257
val jobId = jobStart.jobId
53-
val stageInfos = jobStart.stageInfos
58+
jobIds += jobId
59+
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted
5460

55-
stageInfos.foreach { stageInfo =>
56-
stageIds += stageInfo.stageId
57-
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
61+
// Remove state for old jobs
62+
if (jobIds.size >= retainedJobs) {
63+
val toRemove = math.max(retainedJobs / 10, 1)
64+
jobIds.take(toRemove).foreach { id => jobIdToStageIds.remove(id) }
65+
jobIds.trimStart(toRemove)
5866
}
59-
jobIdToStageIds(jobId) = stageInfos.map(_.stageId).sorted
67+
}
6068

61-
// Remove graph metadata for old stages
69+
/** Remove graph metadata for old stages */
70+
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
71+
val stageInfo = stageSubmitted.stageInfo
72+
stageIds += stageInfo.stageId
73+
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
6274
if (stageIds.size >= retainedStages) {
6375
val toRemove = math.max(retainedStages / 10, 1)
6476
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.ui.scope
19+
20+
import org.scalatest.FunSuite
21+
22+
import org.apache.spark.SparkConf
23+
import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListenerStageSubmitted, StageInfo}
24+
25+
class RDDOperationGraphListenerSuite extends FunSuite {
26+
private var jobIdCounter = 0
27+
private var stageIdCounter = 0
28+
29+
/** Run a job with the specified number of stages. */
30+
private def runOneJob(numStages: Int, listener: RDDOperationGraphListener): Unit = {
31+
assert(numStages > 0, "I will not run a job with 0 stages for you.")
32+
val stageInfos = (0 until numStages).map { _ =>
33+
val stageInfo = new StageInfo(stageIdCounter, 0, "s", 0, Seq.empty, Seq.empty, "d")
34+
listener.onStageSubmitted(new SparkListenerStageSubmitted(stageInfo))
35+
stageIdCounter += 1
36+
stageInfo
37+
}
38+
listener.onJobStart(new SparkListenerJobStart(jobIdCounter, 0, stageInfos))
39+
jobIdCounter += 1
40+
}
41+
42+
test("listener cleans up metadata") {
43+
44+
val conf = new SparkConf()
45+
.set("spark.ui.retainedStages", "10")
46+
.set("spark.ui.retainedJobs", "10")
47+
48+
val listener = new RDDOperationGraphListener(conf)
49+
assert(listener.jobIdToStageIds.isEmpty)
50+
assert(listener.stageIdToGraph.isEmpty)
51+
assert(listener.jobIds.isEmpty)
52+
assert(listener.stageIds.isEmpty)
53+
54+
// Run a few jobs, but not enough for clean up yet
55+
runOneJob(1, listener)
56+
runOneJob(2, listener)
57+
runOneJob(3, listener)
58+
assert(listener.jobIdToStageIds.size === 3)
59+
assert(listener.stageIdToGraph.size === 6)
60+
assert(listener.jobIds.size === 3)
61+
assert(listener.stageIds.size === 6)
62+
63+
// Run a few more, but this time the stages should be cleaned up, but not the jobs
64+
runOneJob(5, listener)
65+
runOneJob(100, listener)
66+
assert(listener.jobIdToStageIds.size === 5)
67+
assert(listener.stageIdToGraph.size === 9)
68+
assert(listener.jobIds.size === 5)
69+
assert(listener.stageIds.size === 9)
70+
71+
// Run a few more, but this time both jobs and stages should be cleaned up
72+
(1 to 100).foreach { _ =>
73+
runOneJob(1, listener)
74+
}
75+
assert(listener.jobIdToStageIds.size === 9)
76+
assert(listener.stageIdToGraph.size === 9)
77+
assert(listener.jobIds.size === 9)
78+
assert(listener.stageIds.size === 9)
79+
80+
// Ensure we clean up old jobs and stages, not arbitrary ones
81+
assert(!listener.jobIdToStageIds.contains(0))
82+
assert(!listener.stageIdToGraph.contains(0))
83+
assert(!listener.stageIds.contains(0))
84+
assert(!listener.jobIds.contains(0))
85+
}
86+
87+
}

0 commit comments

Comments
 (0)