Skip to content

Commit b715933

Browse files
avulanovankurdave
authored andcommitted
[SPARK-9436] [GRAPHX] Pregel simplification patch
Pregel code contains two consecutive joins: ``` g.vertices.innerJoin(messages)(vprog) ... g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) } ``` This can be simplified with one join. ankurdave proposed a patch based on our discussion in the mailing list: https://www.mail-archive.com/devspark.apache.org/msg10316.html Author: Alexander Ulanov <[email protected]> Closes apache#7749 from avulanov/SPARK-9436-pregel and squashes the following commits: 8568e06 [Alexander Ulanov] Pregel simplification patch
1 parent 5340dfa commit b715933

File tree

1 file changed

+10
-13
lines changed

1 file changed

+10
-13
lines changed

graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -127,28 +127,25 @@ object Pregel extends Logging {
127127
var prevG: Graph[VD, ED] = null
128128
var i = 0
129129
while (activeMessages > 0 && i < maxIterations) {
130-
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
131-
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
132-
// Update the graph with the new vertices.
130+
// Receive the messages and update the vertices.
133131
prevG = g
134-
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
135-
g.cache()
132+
g = g.joinVertices(messages)(vprog).cache()
136133

137134
val oldMessages = messages
138-
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
139-
// get to send messages. We must cache messages so it can be materialized on the next line,
140-
// allowing us to uncache the previous iteration.
141-
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache()
142-
// The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This
143-
// hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
144-
// vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
135+
// Send new messages, skipping edges where neither side received a message. We must cache
136+
// messages so it can be materialized on the next line, allowing us to uncache the previous
137+
// iteration.
138+
messages = g.mapReduceTriplets(
139+
sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
140+
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
141+
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
142+
// and the vertices of g).
145143
activeMessages = messages.count()
146144

147145
logInfo("Pregel finished iteration " + i)
148146

149147
// Unpersist the RDDs hidden by newly-materialized RDDs
150148
oldMessages.unpersist(blocking = false)
151-
newVerts.unpersist(blocking = false)
152149
prevG.unpersistVertices(blocking = false)
153150
prevG.edges.unpersist(blocking = false)
154151
// count the iteration

0 commit comments

Comments
 (0)