Skip to content

Commit 1654932

Browse files
committed
[SPARK-7984][Graphx]Complex Operators between Graphs: Union
1 parent 164fe2a commit 1654932

File tree

5 files changed

+110
-0
lines changed

5 files changed

+110
-0
lines changed

graphx/src/main/scala/org/apache/spark/graphx/Graph.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,25 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
329329
*/
330330
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
331331

332+
/**
333+
* Union current Graph with other Graph.
334+
* The union of two graphs G(VG, EG) and H(VH, EH) is the union of their vertex sets
335+
* and their edge families. Which means G u H = (VG u VH, EG u EH).
336+
* @param other the other Graph will union
337+
* @param mergeSameVertexAttr merge same vertex attribute function
338+
* @param mergeSameEdgeAttr merge same edge attribute function
339+
* @tparam VD2 other Graph Vertex Type
340+
* @tparam ED2 other Graph Edge Type
341+
* @tparam VD3 result joined Graph Vertex Type
342+
* @tparam ED3 result joined Graph Edge Type
343+
* @return a graph join withe the two graph's vertex and edge set
344+
*/
345+
def union[VD2: ClassTag, ED2: ClassTag, VD3: ClassTag, ED3: ClassTag](
346+
other: Graph[VD2, ED2],
347+
mergeSameVertexAttr: (Option[VD], Option[VD2]) => VD3,
348+
mergeSameEdgeAttr: (Option[ED], Option[ED2]) => ED3)
349+
: Graph[VD3, ED3]
350+
332351
/**
333352
* Merges multiple edges between two vertices into a single edge. For correct results, the graph
334353
* must have been partitioned using [[partitionBy]].

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,39 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
178178
new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
179179
}
180180

181+
override def union[VD2: ClassTag, ED2: ClassTag, VD3: ClassTag, ED3: ClassTag] (
182+
other: Graph[VD2, ED2],
183+
mergeSameVertexAttr: (Option[VD], Option[VD2]) => VD3,
184+
mergeSameEdgeAttr: (Option[ED], Option[ED2]) => ED3): Graph[VD3, ED3] = {
185+
186+
val newVertexRDD: RDD[(VertexId, VD3)] = vertices.fullOuterJoin(other.vertices).map {
187+
pair => (pair._1, mergeSameVertexAttr(pair._2._1, pair._2._2))
188+
}.cache()
189+
190+
// convert other EdgeRDD to kv pair RDD
191+
val otherPair = other.edges.mapPartitions {
192+
iter => iter.map { edge => (edge.srcId.toString + edge.dstId.toString, edge) }
193+
}
194+
195+
// full out join the kv pair RDD
196+
val joinedRDD: RDD[Edge[ED3]] = RDD.rddToPairRDDFunctions {
197+
edges.mapPartitions { _.map(edge => (edge.srcId.toString + edge.dstId.toString, edge)) }
198+
}.fullOuterJoin(otherPair).map {
199+
f => {
200+
val curEdge = f._2._1
201+
val otherEdge = f._2._2
202+
val edge = curEdge.getOrElse(otherEdge.get)
203+
val curAttr = if (curEdge.isDefined) Some(curEdge.get.attr) else None
204+
val otherAttr = if (otherEdge.isDefined) Some(otherEdge.get.attr) else None
205+
Edge(edge.srcId, edge.dstId, mergeSameEdgeAttr(curAttr, otherAttr))
206+
}
207+
}
208+
209+
// convert to EdgeRDD and new Graph
210+
val newEdgeRDD: EdgeRDDImpl[ED3, VD3] = EdgeRDD.fromEdges[ED3, VD3](joinedRDD).cache()
211+
new GraphImpl(VertexRDD(newVertexRDD), new ReplicatedVertexView(newEdgeRDD))
212+
}
213+
181214
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
182215
val newEdges = replicatedVertexView.edges.mapEdgePartitions(
183216
(pid, part) => part.groupEdges(merge))
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
1,2
2+
1,3
3+
1,4
4+
2,1
5+
2,3
6+
2,4
7+
3,1
8+
3,2
9+
3,4
10+
4,1
11+
4,2
12+
4,3
13+
5,4
14+
5,6
15+
6,5
16+
6,4
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
5,6
2+
5,7
3+
5,8
4+
6,5
5+
6,7
6+
6,8
7+
7,5
8+
7,6
9+
7,8
10+
8,5
11+
8,6
12+
8,7

graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,4 +428,34 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
428428
}
429429
}
430430

431+
test("union between two graph") {
432+
withSpark { sc =>
433+
val rdd1 = sc.textFile(getClass.getResource("/union_1_test.data").getFile).zipWithIndex().map{
434+
line =>
435+
val fields = line._1.split(",")
436+
Edge(fields(0).trim.toLong, fields(1).trim.toLong, line._2 + 1)
437+
}
438+
val rdd2 = sc.textFile(getClass.getResource("/union_2_test.data").getFile).zipWithIndex().map{
439+
line =>
440+
val fields = line._1.split(",")
441+
Edge(fields(0).trim.toLong, fields(1).trim.toLong, line._2 + 1)
442+
}
443+
444+
val mergeVertex = (a: Option[Int], b: Option[Int]) => a.getOrElse(0) + b.getOrElse(0)
445+
val mergeEdge = (a: Option[Long], b: Option[Long]) => a.getOrElse(0L) + b.getOrElse(0L)
446+
447+
val graph1 = Graph.fromEdges(rdd1, 1)
448+
val graph2 = Graph.fromEdges(rdd2, 2)
449+
val graph3 = graph1.union(graph2, mergeVertex, mergeEdge)
450+
451+
assert(graph1.edges.count() + graph2.edges.count - 2 == graph3.edges.count)
452+
453+
val diff = (graph1.edges.collect() ++ graph2.edges.collect()).diff(graph3.edges.collect())
454+
assert(diff.count(p => p.srcId == 5L || p.srcId == 6L || p.dstId == 5L || p.dstId == 6L) == 4)
455+
456+
val vdiff = graph3.vertices.collect().diff(graph1.vertices.collect ++ graph2.vertices.collect)
457+
assert(vdiff.diff(Array((6, 3), (5, 3))).length == 0)
458+
}
459+
}
460+
431461
}

0 commit comments

Comments
 (0)