Skip to content

Commit f458c83

Browse files
committed
Revert "[SPARK-1552] Fix type comparison bug in mapVertices and outerJoinVertices"
This reverts commit 16d6af8.
1 parent 16d6af8 commit f458c83

File tree

8 files changed

+39
-136
lines changed

8 files changed

+39
-136
lines changed

docs/graphx-programming-guide.md

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,6 @@ class Graph[VD, ED] {
320320
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
321321
// Transform vertex and edge attributes ==========================================================
322322
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
323-
def mapVerticesConserve(map: (VertexID, VD) => VD): Graph[VD, ED]
324323
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
325324
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
326325
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
@@ -339,9 +338,6 @@ class Graph[VD, ED] {
339338
def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
340339
(mapFunc: (VertexID, VD, Option[U]) => VD2)
341340
: Graph[VD2, ED]
342-
def outerJoinVerticesConserve[U](other: RDD[(VertexID, U)])
343-
(mapFunc: (VertexID, VD, Option[U]) => VD)
344-
: Graph[VD, ED]
345341
// Aggregate information about adjacent triplets =================================================
346342
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
347343
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
@@ -373,7 +369,6 @@ graph contains the following:
373369
{% highlight scala %}
374370
class Graph[VD, ED] {
375371
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
376-
def mapVerticesConserve(map: (VertexId, VD) => VD): Graph[VD, ED]
377372
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
378373
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
379374
}
@@ -397,10 +392,6 @@ val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
397392

398393
[Graph.mapVertices]: api/scala/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
399394

400-
When a call to `mapVertices` would not change the vertex attribute type, use the
401-
`mapVerticesConserve` operator for better performance. This version of the operator avoids moving
402-
unchanged vertex attributes when updating the triplets view.
403-
404395
These operators are often used to initialize the graph for a particular computation or project away
405396
unnecessary properties. For example, given a graph with the out-degrees as the vertex properties
406397
(we describe how to construct such a graph later), we initialize it for PageRank:
@@ -515,8 +506,6 @@ class Graph[VD, ED] {
515506
: Graph[VD, ED]
516507
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
517508
: Graph[VD2, ED]
518-
def outerJoinVerticesConserve[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD)
519-
: Graph[VD, ED]
520509
}
521510
{% endhighlight %}
522511

@@ -544,10 +533,6 @@ property type. Because not all vertices may have a matching value in the input
544533
function takes an `Option` type. For example, we can setup a graph for PageRank by initializing
545534
vertex properties with their `outDegree`.
546535

547-
Similarly to `mapVerticesConserve`, when a call to `outerJoinVertices` would not change the vertex
548-
attribute type, use the `outerJoinVerticesConserve` operator for better performance. This version of
549-
the operator avoids moving unchanged vertex attributes when updating the triplets view.
550-
551536
[Graph.outerJoinVertices]: api/scala/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]
552537

553538

@@ -763,7 +748,7 @@ class GraphOps[VD, ED] {
763748
// Run the vertex program on all vertices that receive messages
764749
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
765750
// Merge the new vertex values back into the graph
766-
g = g.outerJoinVerticesConserve(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
751+
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
767752
// Send Messages: ------------------------------------------------------------------------------
768753
// Vertices that didn't receive a message above don't appear in newVerts and therefore don't
769754
// get to send messages. More precisely the map phase of mapReduceTriplets is only invoked

graphx/src/main/scala/org/apache/spark/graphx/Graph.scala

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -130,19 +130,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
130130
*/
131131
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
132132

133-
/**
134-
* Transforms each vertex attribute in the graph using the map function. Like [[mapVertices]], but
135-
* since the type is conserved, is able to avoid moving unchanged vertex attributes when updating
136-
* the triplets view.
137-
*
138-
* @note The new graph has the same structure. As a consequence the underlying index structures
139-
* can be reused.
140-
*
141-
* @param map the function from a vertex object to a new vertex value of the same type
142-
*
143-
*/
144-
def mapVerticesConserve(map: (VertexId, VD) => VD): Graph[VD, ED]
145-
146133
/**
147134
* Transforms each edge attribute in the graph using the map function. The map function is not
148135
* passed the vertex value for the vertices adjacent to the edge. If vertex values are desired,
@@ -354,25 +341,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
354341
(mapFunc: (VertexId, VD, Option[U]) => VD2)
355342
: Graph[VD2, ED]
356343

357-
/**
358-
* Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`. Like
359-
* [[outerJoinVertices]], but since the type is conserved, is able to avoid moving unchanged
360-
* vertex attributes when updating the triplets view.
361-
*
362-
* The input table should contain at most one entry for each vertex. If no entry in `other` is
363-
* provided for a particular vertex in the graph, the map function receives `None`.
364-
*
365-
* @tparam U the type of entry in the table of updates
366-
*
367-
* @param other the table to join with the vertices in the graph.
368-
* The table should contain at most one entry for each vertex.
369-
* @param mapFunc the function used to compute the new vertex values. The map function is invoked
370-
* for all vertices, even those that do not have a corresponding entry in the table. It must
371-
* conserve the original vertex attribute type.
372-
*/
373-
def outerJoinVerticesConserve[U: ClassTag](other: RDD[(VertexId, U)])
374-
(mapFunc: (VertexId, VD, Option[U]) => VD): Graph[VD, ED]
375-
376344
/**
377345
* The associated [[GraphOps]] object.
378346
*/

graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
213213
case None => data
214214
}
215215
}
216-
graph.outerJoinVerticesConserve(table)(uf)
216+
graph.outerJoinVertices(table)(uf)
217217
}
218218

219219
/**

graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ object Pregel extends Logging {
119119
mergeMsg: (A, A) => A)
120120
: Graph[VD, ED] =
121121
{
122-
var g = graph.mapVerticesConserve((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
122+
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
123123
// compute the messages
124124
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
125125
var activeMessages = messages.count()
@@ -131,7 +131,7 @@ object Pregel extends Logging {
131131
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
132132
// Update the graph with the new vertices.
133133
prevG = g
134-
g = g.outerJoinVerticesConserve(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
134+
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
135135
g.cache()
136136

137137
val oldMessages = messages

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,18 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
101101
}
102102

103103
override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
104-
// The map does not conserve type, so we must re-replicate all vertices
105-
GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
106-
}
107-
108-
override def mapVerticesConserve(f: (VertexId, VD) => VD): Graph[VD, ED] = {
109-
vertices.cache()
110-
// The map conserves type, so we can use incremental replication
111-
val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
112-
val changedVerts = vertices.diff(newVerts)
113-
val newReplicatedVertexView = replicatedVertexView.updateVertices(changedVerts)
114-
new GraphImpl(newVerts, newReplicatedVertexView)
104+
if (classTag[VD] equals classTag[VD2]) {
105+
vertices.cache()
106+
// The map preserves type, so we can use incremental replication
107+
val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
108+
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
109+
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
110+
.updateVertices(changedVerts)
111+
new GraphImpl(newVerts, newReplicatedVertexView)
112+
} else {
113+
// The map does not preserve type, so we must re-replicate all vertices
114+
GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
115+
}
115116
}
116117

117118
override def mapEdges[ED2: ClassTag](
@@ -228,20 +229,19 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
228229
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
229230
(other: RDD[(VertexId, U)])
230231
(updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] = {
231-
// updateF does not conserve type, so we must re-replicate all vertices
232-
val newVerts = vertices.leftJoin(other)(updateF)
233-
GraphImpl(newVerts, replicatedVertexView.edges)
234-
}
235-
236-
override def outerJoinVerticesConserve[U: ClassTag]
237-
(other: RDD[(VertexId, U)])
238-
(updateF: (VertexId, VD, Option[U]) => VD): Graph[VD, ED] = {
239-
vertices.cache()
240-
// updateF conserves type, so we can use incremental replication
241-
val newVerts = vertices.leftJoin(other)(updateF).cache()
242-
val changedVerts = vertices.diff(newVerts)
243-
val newReplicatedVertexView = replicatedVertexView.updateVertices(changedVerts)
244-
new GraphImpl(newVerts, newReplicatedVertexView)
232+
if (classTag[VD] equals classTag[VD2]) {
233+
vertices.cache()
234+
// updateF preserves type, so we can use incremental replication
235+
val newVerts = vertices.leftJoin(other)(updateF).cache()
236+
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
237+
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
238+
.updateVertices(changedVerts)
239+
new GraphImpl(newVerts, newReplicatedVertexView)
240+
} else {
241+
// updateF does not preserve type, so we must re-replicate all vertices
242+
val newVerts = vertices.leftJoin(other)(updateF)
243+
GraphImpl(newVerts, replicatedVertexView.edges)
244+
}
245245
}
246246

247247
/** Test whether the closure accesses the the attribute with name `attrName`. */

graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ object SVDPlusPlus {
7878
et => Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))),
7979
(g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
8080

81-
g = g.outerJoinVerticesConserve(t0) {
81+
g = g.outerJoinVertices(t0) {
8282
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
8383
msg: Option[(Long, Double)]) =>
8484
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
@@ -112,7 +112,7 @@ object SVDPlusPlus {
112112
val t1 = g.mapReduceTriplets(
113113
et => Iterator((et.srcId, et.dstAttr._2)),
114114
(g1: DoubleMatrix, g2: DoubleMatrix) => g1.addColumnVector(g2))
115-
g = g.outerJoinVerticesConserve(t1) {
115+
g = g.outerJoinVertices(t1) {
116116
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
117117
msg: Option[DoubleMatrix]) =>
118118
if (msg.isDefined) (vd._1, vd._1
@@ -125,7 +125,7 @@ object SVDPlusPlus {
125125
mapTrainF(conf, u),
126126
(g1: (DoubleMatrix, DoubleMatrix, Double), g2: (DoubleMatrix, DoubleMatrix, Double)) =>
127127
(g1._1.addColumnVector(g2._1), g1._2.addColumnVector(g2._2), g1._3 + g2._3))
128-
g = g.outerJoinVerticesConserve(t2) {
128+
g = g.outerJoinVertices(t2) {
129129
(vid: VertexId,
130130
vd: (DoubleMatrix, DoubleMatrix, Double, Double),
131131
msg: Option[(DoubleMatrix, DoubleMatrix, Double)]) =>
@@ -149,7 +149,7 @@ object SVDPlusPlus {
149149
}
150150
g.cache()
151151
val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
152-
g = g.outerJoinVerticesConserve(t3) {
152+
g = g.outerJoinVertices(t3) {
153153
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: Option[Double]) =>
154154
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
155155
}

graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ object StronglyConnectedComponents {
4848
iter += 1
4949
do {
5050
numVertices = sccWorkGraph.numVertices
51-
sccWorkGraph = sccWorkGraph.outerJoinVerticesConserve(sccWorkGraph.outDegrees) {
51+
sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.outDegrees) {
5252
(vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true)
53-
}.outerJoinVerticesConserve(sccWorkGraph.inDegrees) {
53+
}.outerJoinVertices(sccWorkGraph.inDegrees) {
5454
(vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true)
5555
}.cache()
5656

@@ -60,16 +60,14 @@ object StronglyConnectedComponents {
6060
.mapValues { (vid, data) => data._1}
6161

6262
// write values to sccGraph
63-
sccGraph = sccGraph.outerJoinVerticesConserve(finalVertices) {
63+
sccGraph = sccGraph.outerJoinVertices(finalVertices) {
6464
(vid, scc, opt) => opt.getOrElse(scc)
6565
}
6666
// only keep vertices that are not final
6767
sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2).cache()
6868
} while (sccWorkGraph.numVertices < numVertices)
6969

70-
sccWorkGraph = sccWorkGraph.mapVerticesConserve {
71-
case (vid, (color, isFinal)) => (vid, isFinal)
72-
}
70+
sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => (vid, isFinal) }
7371

7472
// collect min of all my neighbor's scc values, update if it's smaller than mine
7573
// then notify any neighbors with scc values larger than mine

graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala

Lines changed: 2 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
150150
withSpark { sc =>
151151
val n = 5
152152
val star = starGraph(sc, n)
153-
// mapVertices conserving type
153+
// mapVertices preserving type
154154
val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
155155
assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
156156
// mapVertices changing type
@@ -159,40 +159,6 @@ class GraphSuite extends FunSuite with LocalSparkContext {
159159
}
160160
}
161161

162-
test("mapVertices changing type with same erased type") {
163-
withSpark { sc =>
164-
val vertices = sc.parallelize(Array[(Long, Option[java.lang.Integer])](
165-
(1L, Some(1)),
166-
(2L, Some(2)),
167-
(3L, Some(3))
168-
))
169-
val edges = sc.parallelize(Array(
170-
Edge(1L, 2L, 0),
171-
Edge(2L, 3L, 0),
172-
Edge(3L, 1L, 0)
173-
))
174-
val graph0 = Graph(vertices, edges)
175-
// Trigger initial vertex replication
176-
graph0.triplets.foreach(x => {})
177-
// Change type of replicated vertices, but conserve erased type
178-
val graph1 = graph0.mapVertices {
179-
case (vid, integerOpt) => integerOpt.map((x: java.lang.Integer) => (x.toDouble): java.lang.Double)
180-
}
181-
// Access replicated vertices, exposing the erased type
182-
val graph2 = graph1.mapTriplets(t => t.srcAttr.get)
183-
assert(graph2.edges.map(_.attr).collect.toSet === Set[java.lang.Double](1.0, 2.0, 3.0))
184-
}
185-
}
186-
187-
test("mapVerticesConserve") {
188-
withSpark { sc =>
189-
val n = 5
190-
val star = starGraph(sc, n)
191-
val mappedVAttrs = star.mapVerticesConserve((vid, attr) => attr + "2")
192-
assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
193-
}
194-
}
195-
196162
test("mapEdges") {
197163
withSpark { sc =>
198164
val n = 3
@@ -331,16 +297,14 @@ class GraphSuite extends FunSuite with LocalSparkContext {
331297
withSpark { sc =>
332298
val n = 5
333299
val reverseStar = starGraph(sc, n).reverse.cache()
334-
335300
// outerJoinVertices changing type
336301
val reverseStarDegrees =
337302
reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
338303
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
339304
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
340305
(a: Int, b: Int) => a + b).collect.toSet
341306
assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
342-
343-
// outerJoinVertices conserving type
307+
// outerJoinVertices preserving type
344308
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
345309
val newReverseStar =
346310
reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
@@ -349,18 +313,6 @@ class GraphSuite extends FunSuite with LocalSparkContext {
349313
}
350314
}
351315

352-
test("outerJoinVerticesConserve") {
353-
withSpark { sc =>
354-
val n = 5
355-
val reverseStar = starGraph(sc, n).reverse.cache()
356-
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
357-
val newReverseStar =
358-
reverseStar.outerJoinVerticesConserve(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
359-
assert(newReverseStar.vertices.map(_._2).collect.toSet ===
360-
(0 to n).map(x => "v%d".format(x)).toSet)
361-
}
362-
}
363-
364316
test("more edge partitions than vertex partitions") {
365317
withSpark { sc =>
366318
val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1)

0 commit comments

Comments
 (0)