Skip to content

Commit 24d4ad6

Browse files
author
ding
committed
turn off checkpoint graph and messages in pregel by default
1 parent 5015b44 commit 24d4ad6

File tree

3 files changed

+10
-11
lines changed

3 files changed

+10
-11
lines changed

docs/configuration.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2121,10 +2121,10 @@ showDF(properties, numRows = 200, truncate = FALSE)
21212121
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
21222122
<tr>
21232123
<td><code>spark.graphx.pregel.checkpointInterval</code></td>
2124-
<td>10</td>
2124+
<td>-1</td>
21252125
<td>
21262126
Checkpoint interval for graph and message in Pregel. It used to avoid stackOverflowError due to long lineage chains
2127-
after lots of iterations. The checkpoint can be disabled by set as -1.
2127+
after lots of iterations. The checkpoint is disabled by default.
21282128
</td>
21292129
</tr>
21302130
</table>

docs/graphx-programming-guide.md

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -708,9 +708,8 @@ messages remaining.
708708
> messaging function. These constraints allow additional optimization within GraphX.
709709
710710
The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch*
711-
of its implementation (note: to avoid stackOverflowError due to long lineage chains, graph and
712-
messages are periodically checkpoint and the checkpoint interval is set by
713-
"spark.graphx.pregel.checkpointInterval", it can be disable by set as -1):
711+
of its implementation (note: to avoid stackOverflowError due to long lineage chains, pregel support periodcally
712+
checkpoint graph and messages by setting "spark.graphx.pregel.checkpointInterval"):
714713

715714
{% highlight scala %}
716715
class GraphOps[VD, ED] {
@@ -726,7 +725,7 @@ class GraphOps[VD, ED] {
726725
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
727726

728727
// compute the messages
729-
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
728+
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
730729
var activeMessages = messages.count()
731730
// Loop until no messages remain or maxIterations is achieved
732731
var i = 0
@@ -735,8 +734,8 @@ class GraphOps[VD, ED] {
735734
g = g.joinVertices(messages)(vprog).cache()
736735
val oldMessages = messages
737736
// Send new messages, skipping edges where neither side received a message. We must cache
738-
// and periodic checkpoint messages so it can be materialized on the next line, and avoid
739-
// to have a long lineage chain.
737+
// messages so it can be materialized on the next line, allowing us to uncache the previous
738+
// iteration.
740739
messages = GraphXUtils.mapReduceTriplets(
741740
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
742741
activeMessages = messages.count()

graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ object Pregel extends Logging {
126126
s" but got ${maxIterations}")
127127

128128
val checkpointInterval = graph.vertices.sparkContext.getConf
129-
.getInt("spark.graphx.pregel.checkpointInterval", 10)
129+
.getInt("spark.graphx.pregel.checkpointInterval", -1)
130130
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))
131131
val graphCheckpointer = new PeriodicGraphCheckpointer[VD, ED](
132132
checkpointInterval, graph.vertices.sparkContext)
@@ -150,8 +150,8 @@ object Pregel extends Logging {
150150

151151
val oldMessages = messages
152152
// Send new messages, skipping edges where neither side received a message. We must cache
153-
// and periodic checkpoint messages so it can be materialized on the next line, and avoid
154-
// to have a long lineage chain.
153+
// messages so it can be materialized on the next line, allowing us to uncache the previous
154+
// iteration.
155155
messages = GraphXUtils.mapReduceTriplets(
156156
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))
157157
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages

0 commit comments

Comments
 (0)