Skip to content

Conversation

@tien-dungle
Copy link
Contributor

The change here is to keep the cached RDDs in the graph object so that when the graph.unpersist() is called these RDDs are correctly unpersisted.

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.slf4j.LoggerFactory
import org.apache.spark.graphx.util.GraphGenerators

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
graph.cache().numEdges

graph.unpersist()

sc.getPersistentRDDs.foreach( r => println( r._2.toString))

@srowen
Copy link
Member

srowen commented Jul 17, 2015

@ankurdave @jegonzal

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #1095 has finished for PR 7469 at commit 8d87997.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • logDebug("isMulticlass = " + metadata.isMulticlass)
    • * (i.e., if isMulticlass && isSpaceSufficientForAllCategoricalSplits),
    • logDebug("isMulticlass = " + metadata.isMulticlass)
    • abstract class UnsafeProjection extends Projection
    • case class FromUnsafeProjection(fields: Seq[DataType]) extends Projection
    • abstract class BaseProjection extends Projection
    • class SpecificProjection extends $
    • class SpecificProjection extends $

@asfgit asfgit closed this in 587c315 Jul 17, 2015
asfgit pushed a commit that referenced this pull request Jul 17, 2015
The change here is to keep the cached RDDs in the graph object so that when the graph.unpersist() is called these RDDs are correctly unpersisted.

```java
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.slf4j.LoggerFactory
import org.apache.spark.graphx.util.GraphGenerators

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
graph.cache().numEdges

graph.unpersist()

sc.getPersistentRDDs.foreach( r => println( r._2.toString))
```

Author: tien-dungle <[email protected]>

Closes #7469 from tien-dungle/SPARK-9109_Graphx-unpersist and squashes the following commits:

8d87997 [tien-dungle] Keep the cached edge in the graph

(cherry picked from commit 587c315)
Signed-off-by: Ankur Dave <[email protected]>
@ankurdave
Copy link
Contributor

Thanks for finding the leak here. Merged into master and branch-1.4.

@srowen
Copy link
Member

srowen commented Jul 19, 2015

Thanks @ankurdave -- you can follow this by resolving the issue (done already now)

@ankurdave
Copy link
Contributor

Oh, thanks @srowen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants