Skip to content

Commit d0a5c32

Browse files
jasonclsrowen
authored andcommitted
[SPARK-12655][GRAPHX] GraphX does not unpersist RDDs
Some VertexRDD and EdgeRDD are created during the intermediate step of g.connectedComponents() but unnecessarily left cached after the method is done. The fix is to unpersist these RDDs once they are no longer in use. A test case is added to confirm the fix for the reported bug. Author: Jason Lee <[email protected]> Closes #10713 from jasoncl/SPARK-12655.
1 parent fe7246f commit d0a5c32

File tree

3 files changed

+20
-2
lines changed

3 files changed

+20
-2
lines changed

graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ object Pregel extends Logging {
151151
// count the iteration
152152
i += 1
153153
}
154-
154+
messages.unpersist(blocking = false)
155155
g
156156
} // end of apply
157157

graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,11 @@ object ConnectedComponents {
4747
}
4848
}
4949
val initialMessage = Long.MaxValue
50-
Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
50+
val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
5151
vprog = (id, attr, msg) => math.min(attr, msg),
5252
sendMsg = sendMessage,
5353
mergeMsg = (a, b) => math.min(a, b))
54+
ccGraph.unpersist()
55+
pregelGraph
5456
} // end of connectedComponents
5557
}

graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,4 +428,20 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
428428
}
429429
}
430430

431+
test("unpersist graph RDD") {
432+
withSpark { sc =>
433+
val vert = sc.parallelize(List((1L, "a"), (2L, "b"), (3L, "c")), 1)
434+
val edges = sc.parallelize(List(Edge[Long](1L, 2L), Edge[Long](1L, 3L)), 1)
435+
val g0 = Graph(vert, edges)
436+
val g = g0.partitionBy(PartitionStrategy.EdgePartition2D, 2)
437+
val cc = g.connectedComponents()
438+
assert(sc.getPersistentRDDs.nonEmpty)
439+
cc.unpersist()
440+
g.unpersist()
441+
g0.unpersist()
442+
vert.unpersist()
443+
edges.unpersist()
444+
assert(sc.getPersistentRDDs.isEmpty)
445+
}
446+
}
431447
}

0 commit comments

Comments
 (0)