Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,18 @@ class VertexRDD[@specialized VD: ClassTag](
def reverseRoutingTables(): VertexRDD[VD] =
this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))

/** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get)
val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
(partIter, routingTableIter) =>
val routingTable =
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
partIter.map(_.withRoutingTable(routingTable))
}
new VertexRDD(vertexPartitions)
}

/** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
private[graphx] def shipVertexAttributes(
shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true))
GraphImpl.fromExistingRDDs(vertices, newEdges)
}, preservesPartitioning = true)).cache()
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
}

override def reverse: Graph[VD, ED] = {
Expand Down Expand Up @@ -277,7 +277,11 @@ object GraphImpl {
GraphImpl(vertexRDD, edgeRDD)
}

/** Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. */
/**
* Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. The
* VertexRDD must already be set up for efficient joins with the EdgeRDD by calling
* `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
Expand All @@ -290,7 +294,8 @@ object GraphImpl {

/**
* Create a graph from a VertexRDD and an EdgeRDD with the same replicated vertex type as the
* vertices.
* vertices. The VertexRDD must already be set up for efficient joins with the EdgeRDD by calling
* `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
*/
def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
Expand Down
10 changes: 10 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
Iterator((part.srcIds ++ part.dstIds).toSet)
}.collect
assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))

// Forming triplets view
val g = Graph(
sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
assert(g.triplets.collect.map(_.toTuple).toSet ===
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
val gPart = g.partitionBy(EdgePartition2D)
assert(gPart.triplets.collect.map(_.toTuple).toSet ===
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
}
}

Expand Down