Skip to content

Commit b1feb60

Browse files
ankurdaverxin
authored andcommitted
[SPARK-1991] Support custom storage levels for vertices and edges
This PR adds support for specifying custom storage levels for the vertices and edges of a graph. This enables GraphX to handle graphs larger than memory size by specifying MEMORY_AND_DISK and then repartitioning the graph to use many small partitions, each of which does fit in memory. Spark will then automatically load partitions from disk as needed. The user specifies the desired vertex and edge storage levels when building the graph by passing them to the graph constructor. These are then stored in the `targetStorageLevel` attribute of the VertexRDD and EdgeRDD respectively. Whenever GraphX needs to cache a VertexRDD or EdgeRDD (because it plans to use it more than once, for example), it uses the specified target storage level. Also, when the user calls `Graph#cache()`, the vertices and edges are persisted using their target storage levels. In order to facilitate propagating the target storage levels across VertexRDD and EdgeRDD operations, we remove raw calls to the constructors and instead introduce the `withPartitionsRDD` and `withTargetStorageLevel` methods. I tested this change by running PageRank and triangle count on a severely memory-constrained cluster (1 executor with 300 MB of memory, and a 1 GB graph). Before this PR, these algorithms used to fail with OutOfMemoryErrors. With this PR, and using the DISK_ONLY storage level, they succeed. Author: Ankur Dave <[email protected]> Closes apache#946 from ankurdave/SPARK-1991 and squashes the following commits: ce17d95 [Ankur Dave] Move pickStorageLevel to StorageLevel.fromString ccaf06f [Ankur Dave] Shadow members in withXYZ() methods rather than using underscores c34abc0 [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0 c5ca068 [Ankur Dave] Revert "Exclude all of GraphX from binary compatibility checks" 34bcefb [Ankur Dave] Exclude all of GraphX from binary compatibility checks 6fdd137 [Ankur Dave] [SPARK-1991] Support custom storage levels for vertices and edges
1 parent 894ecde commit b1feb60

File tree

8 files changed

+229
-97
lines changed

8 files changed

+229
-97
lines changed

core/src/main/scala/org/apache/spark/storage/StorageLevel.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,27 @@ object StorageLevel {
147147
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
148148
val OFF_HEAP = new StorageLevel(false, false, true, false)
149149

150+
/**
151+
* :: DeveloperApi ::
152+
* Return the StorageLevel object with the specified name.
153+
*/
154+
@DeveloperApi
155+
def fromString(s: String): StorageLevel = s match {
156+
case "NONE" => NONE
157+
case "DISK_ONLY" => DISK_ONLY
158+
case "DISK_ONLY_2" => DISK_ONLY_2
159+
case "MEMORY_ONLY" => MEMORY_ONLY
160+
case "MEMORY_ONLY_2" => MEMORY_ONLY_2
161+
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
162+
case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
163+
case "MEMORY_AND_DISK" => MEMORY_AND_DISK
164+
case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
165+
case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
166+
case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
167+
case "OFF_HEAP" => OFF_HEAP
168+
case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + s)
169+
}
170+
150171
/**
151172
* :: DeveloperApi ::
152173
* Create a new StorageLevel object without setting useOffHeap.

graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
2424
import org.apache.spark.storage.StorageLevel
2525

2626
import org.apache.spark.graphx.impl.EdgePartition
27+
import org.apache.spark.graphx.impl.EdgePartitionBuilder
2728

2829
/**
2930
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
@@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition
3233
* `impl.ReplicatedVertexView`.
3334
*/
3435
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
35-
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
36+
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
37+
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
3638
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
3739

3840
partitionsRDD.setName("EdgeRDD")
@@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
5860

5961
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
6062

63+
/**
64+
* Persists the edge partitions at the specified storage level, ignoring any existing target
65+
* storage level.
66+
*/
6167
override def persist(newLevel: StorageLevel): this.type = {
6268
partitionsRDD.persist(newLevel)
6369
this
@@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
6874
this
6975
}
7076

77+
/** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
78+
override def cache(): this.type = {
79+
partitionsRDD.persist(targetStorageLevel)
80+
this
81+
}
82+
7183
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
7284
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
73-
new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
85+
this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
7486
if (iter.hasNext) {
7587
val (pid, ep) = iter.next()
7688
Iterator(Tuple2(pid, f(pid, ep)))
@@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
118130
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
119131
val ed2Tag = classTag[ED2]
120132
val ed3Tag = classTag[ED3]
121-
new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
133+
this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
122134
(thisIter, otherIter) =>
123135
val (pid, thisEPart) = thisIter.next()
124136
val (_, otherEPart) = otherIter.next()
125137
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
126138
})
127139
}
140+
141+
/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
142+
private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
143+
partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
144+
new EdgeRDD(partitionsRDD, this.targetStorageLevel)
145+
}
146+
147+
/**
148+
* Changes the target storage level while preserving all other properties of the
149+
* EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level.
150+
*
151+
* This does not actually trigger a cache; to do this, call
152+
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
153+
*/
154+
private[graphx] def withTargetStorageLevel(
155+
targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
156+
new EdgeRDD(this.partitionsRDD, targetStorageLevel)
157+
}
158+
159+
}
160+
161+
object EdgeRDD {
162+
/**
163+
* Creates an EdgeRDD from a set of edges.
164+
*
165+
* @tparam ED the edge attribute type
166+
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
167+
*/
168+
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
169+
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
170+
val builder = new EdgePartitionBuilder[ED, VD]
171+
iter.foreach { e =>
172+
builder.add(e.srcId, e.dstId, e.attr)
173+
}
174+
Iterator((pid, builder.toEdgePartition))
175+
}
176+
EdgeRDD.fromEdgePartitions(edgePartitions)
177+
}
178+
179+
/**
180+
* Creates an EdgeRDD from already-constructed edge partitions.
181+
*
182+
* @tparam ED the edge attribute type
183+
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
184+
*/
185+
def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
186+
edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
187+
new EdgeRDD(edgePartitions)
188+
}
128189
}

graphx/src/main/scala/org/apache/spark/graphx/Graph.scala

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
8080
@transient val triplets: RDD[EdgeTriplet[VD, ED]]
8181

8282
/**
83-
* Caches the vertices and edges associated with this graph at the specified storage level.
83+
* Caches the vertices and edges associated with this graph at the specified storage level,
84+
* ignoring any target storage levels previously set.
8485
*
8586
* @param newLevel the level at which to cache the graph.
8687
*
@@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
8990
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
9091

9192
/**
92-
* Caches the vertices and edges associated with this graph. This is used to
93-
* pin a graph in memory enabling multiple queries to reuse the same
94-
* construction process.
93+
* Caches the vertices and edges associated with this graph at the previously-specified target
94+
* storage levels, which default to `MEMORY_ONLY`. This is used to pin a graph in memory enabling
95+
* multiple queries to reuse the same construction process.
9596
*/
9697
def cache(): Graph[VD, ED]
9798

@@ -358,20 +359,25 @@ object Graph {
358359
* Construct a graph from a collection of edges encoded as vertex id pairs.
359360
*
360361
* @param rawEdges a collection of edges in (src, dst) form
362+
* @param defaultValue the vertex attributes with which to create vertices referenced by the edges
361363
* @param uniqueEdges if multiple identical edges are found they are combined and the edge
362364
* attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
363365
* `uniqueEdges`, a [[PartitionStrategy]] must be provided.
366+
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
367+
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
364368
*
365369
* @return a graph with edge attributes containing either the count of duplicate edges or 1
366370
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
367371
*/
368372
def fromEdgeTuples[VD: ClassTag](
369373
rawEdges: RDD[(VertexId, VertexId)],
370374
defaultValue: VD,
371-
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
375+
uniqueEdges: Option[PartitionStrategy] = None,
376+
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
377+
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
372378
{
373379
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
374-
val graph = GraphImpl(edges, defaultValue)
380+
val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
375381
uniqueEdges match {
376382
case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
377383
case None => graph
@@ -383,14 +389,18 @@ object Graph {
383389
*
384390
* @param edges the RDD containing the set of edges in the graph
385391
* @param defaultValue the default vertex attribute to use for each vertex
392+
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
393+
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
386394
*
387395
* @return a graph with edge attributes described by `edges` and vertices
388396
* given by all vertices in `edges` with value `defaultValue`
389397
*/
390398
def fromEdges[VD: ClassTag, ED: ClassTag](
391399
edges: RDD[Edge[ED]],
392-
defaultValue: VD): Graph[VD, ED] = {
393-
GraphImpl(edges, defaultValue)
400+
defaultValue: VD,
401+
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
402+
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
403+
GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
394404
}
395405

396406
/**
@@ -405,12 +415,16 @@ object Graph {
405415
* @param edges the collection of edges in the graph
406416
* @param defaultVertexAttr the default vertex attribute to use for vertices that are
407417
* mentioned in edges but not in vertices
418+
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
419+
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
408420
*/
409421
def apply[VD: ClassTag, ED: ClassTag](
410422
vertices: RDD[(VertexId, VD)],
411423
edges: RDD[Edge[ED]],
412-
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
413-
GraphImpl(vertices, edges, defaultVertexAttr)
424+
defaultVertexAttr: VD = null.asInstanceOf[VD],
425+
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
426+
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
427+
GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
414428
}
415429

416430
/**

graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.graphx
1919

20+
import org.apache.spark.storage.StorageLevel
2021
import org.apache.spark.{Logging, SparkContext}
2122
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
2223

@@ -48,12 +49,16 @@ object GraphLoader extends Logging {
4849
* @param canonicalOrientation whether to orient edges in the positive
4950
* direction
5051
* @param minEdgePartitions the number of partitions for the edge RDD
52+
* @param edgeStorageLevel the desired storage level for the edge partitions. To set the vertex
53+
* storage level, call [[org.apache.spark.graphx.Graph#persistVertices]].
5154
*/
5255
def edgeListFile(
5356
sc: SparkContext,
5457
path: String,
5558
canonicalOrientation: Boolean = false,
56-
minEdgePartitions: Int = 1)
59+
minEdgePartitions: Int = 1,
60+
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
61+
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
5762
: Graph[Int, Int] =
5863
{
5964
val startTime = System.currentTimeMillis
@@ -78,12 +83,13 @@ object GraphLoader extends Logging {
7883
}
7984
}
8085
Iterator((pid, builder.toEdgePartition))
81-
}.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
86+
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
8287
edges.count()
8388

8489
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
8590

86-
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
91+
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
92+
vertexStorageLevel = vertexStorageLevel)
8793
} // end of edgeListFile
8894

8995
}

0 commit comments

Comments
 (0)