Skip to content

Commit 42dfab7

Browse files
adrian-wangmateiz
authored andcommitted
[SPARK-2661][bagel]unpersist old processed rdd
Unpersist useless rdd during bagel iteration to make full use of memory. Author: Daoyuan <[email protected]> Closes #1519 from adrian-wang/bagelunpersist and squashes the following commits: 182c9dd [Daoyuan] rename var nextUseless to lastRDD 87fd3a4 [Daoyuan] bagel unpersist old processed rdd
1 parent e34922a commit 42dfab7

File tree

1 file changed

+5
-0
lines changed

1 file changed

+5
-0
lines changed

bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ object Bagel extends Logging {
7272
var verts = vertices
7373
var msgs = messages
7474
var noActivity = false
75+
var lastRDD: RDD[(K, (V, Array[M]))] = null
7576
do {
7677
logInfo("Starting superstep " + superstep + ".")
7778
val startTime = System.currentTimeMillis
@@ -83,6 +84,10 @@ object Bagel extends Logging {
8384
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
8485
val (processed, numMsgs, numActiveVerts) =
8586
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
87+
if (lastRDD != null) {
88+
lastRDD.unpersist(false)
89+
}
90+
lastRDD = processed
8691

8792
val timeTaken = System.currentTimeMillis - startTime
8893
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))

0 commit comments

Comments
 (0)