Skip to content

Commit f34f3d7

Browse files
tien-dungleankurdave
authored andcommitted
[SPARK-9109] [GRAPHX] Keep the cached edge in the graph
The change here is to keep the cached RDDs in the graph object so that when the graph.unpersist() is called these RDDs are correctly unpersisted. ```java import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.slf4j.LoggerFactory import org.apache.spark.graphx.util.GraphGenerators // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) graph.cache().numEdges graph.unpersist() sc.getPersistentRDDs.foreach( r => println( r._2.toString)) ``` Author: tien-dungle <[email protected]> Closes apache#7469 from tien-dungle/SPARK-9109_Graphx-unpersist and squashes the following commits: 8d87997 [tien-dungle] Keep the cached edge in the graph (cherry picked from commit 587c315) Signed-off-by: Ankur Dave <[email protected]>
1 parent bb14015 commit f34f3d7

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,9 +332,9 @@ object GraphImpl {
332332
edgeStorageLevel: StorageLevel,
333333
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
334334
val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
335-
.withTargetStorageLevel(edgeStorageLevel).cache()
335+
.withTargetStorageLevel(edgeStorageLevel)
336336
val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
337-
.withTargetStorageLevel(vertexStorageLevel).cache()
337+
.withTargetStorageLevel(vertexStorageLevel)
338338
GraphImpl(vertexRDD, edgeRDD)
339339
}
340340

@@ -346,9 +346,14 @@ object GraphImpl {
346346
def apply[VD: ClassTag, ED: ClassTag](
347347
vertices: VertexRDD[VD],
348348
edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
349+
350+
vertices.cache()
351+
349352
// Convert the vertex partitions in edges to the correct type
350353
val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
351354
.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
355+
.cache()
356+
352357
GraphImpl.fromExistingRDDs(vertices, newEdges)
353358
}
354359

0 commit comments

Comments
 (0)