@@ -710,7 +710,7 @@ messages remaining.
710710The following is the type signature of the [ Pregel operator] [ GraphOps.pregel ] as well as a * sketch*
711711of its implementation (note: to avoid stackOverflowError due to long lineage chains, graph and
712712messages are periodically checkpoint and the checkpoint interval is set by
713- "spark.graphx.pregel.checkpointInterval"):
713+ "spark.graphx.pregel.checkpointInterval", it can be disable by set as -1 ):
714714
715715{% highlight scala %}
716716class GraphOps[ VD, ED] {
@@ -723,30 +723,22 @@ class GraphOps[VD, ED] {
723723 mergeMsg: (A, A) => A)
724724 : Graph[ VD, ED] = {
725725 // Receive the initial message at each vertex
726- var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))
727- val graphCheckpointer = new PeriodicGraphCheckpointer[ VD, ED] (
728- checkpointInterval, graph.vertices.sparkContext)
729- graphCheckpointer.update(g)
726+ var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
730727
731728 // compute the messages
732- var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
733- val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
734- checkpointInterval, graph.vertices.sparkContext)
735- messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
729+ var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
736730 var activeMessages = messages.count()
737731 // Loop until no messages remain or maxIterations is achieved
738732 var i = 0
739733 while (activeMessages > 0 && i < maxIterations) {
740734 // Receive the messages and update the vertices.
741- g = g.joinVertices(messages)(vprog)
742- graphCheckpointer.update(g)
735+ g = g.joinVertices(messages)(vprog).cache()
743736 val oldMessages = messages
744737 // Send new messages, skipping edges where neither side received a message. We must cache
745738 // and periodic checkpoint messages so it can be materialized on the next line, and avoid
746739 // to have a long lineage chain.
747740 messages = GraphXUtils.mapReduceTriplets(
748- g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
749- messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
741+ g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
750742 activeMessages = messages.count()
751743 i += 1
752744 }
0 commit comments