@@ -28,21 +28,23 @@ object Bagel extends Logging {
2828 /**
2929 * Runs a Bagel program.
3030 * @param sc [[org.apache.spark.SparkContext ]] to use for the program.
31- * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
32- * the vertex id.
33- * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
34- * empty array, i.e. sc.parallelize(Array[K, Message]()).
35- * @param combiner [[org.apache.spark.bagel.Combiner ]] combines multiple individual messages to a given vertex into one
36- * message before sending (which often involves network I/O).
37- * @param aggregator [[org.apache.spark.bagel.Aggregator ]] performs a reduce across all vertices after each superstep,
38- * and provides the result to each vertex in the next superstep.
31+ * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
32+ * Key will be the vertex id.
33+ * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
34+ * this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
35+ * @param combiner [[org.apache.spark.bagel.Combiner ]] combines multiple individual messages to a
36+ * given vertex into one message before sending (which often involves network
37+ * I/O).
38+ * @param aggregator [[org.apache.spark.bagel.Aggregator ]] performs a reduce across all vertices
39+ * after each superstep and provides the result to each vertex in the next
40+ * superstep.
3941 * @param partitioner [[org.apache.spark.Partitioner ]] partitions values by key
4042 * @param numPartitions number of partitions across which to split the graph.
4143 * Default is the default parallelism of the SparkContext
42- * @param storageLevel [[org.apache.spark.storage.StorageLevel ]] to use for caching of intermediate RDDs in each superstep.
43- * Defaults to caching in memory.
44- * @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
45- * optional Aggregator and the current superstep,
44+ * @param storageLevel [[org.apache.spark.storage.StorageLevel ]] to use for caching of
45+ * intermediate RDDs in each superstep. Defaults to caching in memory.
46+ * @param compute function that takes a Vertex, optional set of (possibly combined) messages to
47+ * the Vertex, optional Aggregator and the current superstep,
4648 * and returns a set of (Vertex, outgoing Messages) pairs
4749 * @tparam K key
4850 * @tparam V vertex type
@@ -71,7 +73,7 @@ object Bagel extends Logging {
7173 var msgs = messages
7274 var noActivity = false
7375 do {
74- logInfo(" Starting superstep " + superstep+ " ." )
76+ logInfo(" Starting superstep " + superstep + " ." )
7577 val startTime = System .currentTimeMillis
7678
7779 val aggregated = agg(verts, aggregator)
@@ -97,7 +99,8 @@ object Bagel extends Logging {
9799 verts
98100 }
99101
100- /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator ]] and the default storage level */
102+ /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator ]] and the default
103+ * storage level */
101104 def run [K : Manifest , V <: Vertex : Manifest , M <: Message [K ] : Manifest , C : Manifest ](
102105 sc : SparkContext ,
103106 vertices : RDD [(K , V )],
@@ -106,8 +109,8 @@ object Bagel extends Logging {
106109 partitioner : Partitioner ,
107110 numPartitions : Int
108111 )(
109- compute : (V , Option [C ], Int ) => (V , Array [M ])
110- ) : RDD [( K , V )] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL )(compute)
112+ compute : (V , Option [C ], Int ) => (V , Array [M ])) : RDD [( K , V )] = run(sc, vertices, messages,
113+ combiner, numPartitions, DEFAULT_STORAGE_LEVEL )(compute)
111114
112115 /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator ]] */
113116 def run [K : Manifest , V <: Vertex : Manifest , M <: Message [K ] : Manifest , C : Manifest ](
@@ -127,8 +130,8 @@ object Bagel extends Logging {
127130 }
128131
129132 /**
130- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator ]], default [[ org.apache.spark.HashPartitioner ]]
131- * and default storage level
133+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator ]], default
134+ * [[ org.apache.spark.HashPartitioner ]] and default storage level
132135 */
133136 def run [K : Manifest , V <: Vertex : Manifest , M <: Message [K ] : Manifest , C : Manifest ](
134137 sc : SparkContext ,
@@ -138,9 +141,13 @@ object Bagel extends Logging {
138141 numPartitions : Int
139142 )(
140143 compute : (V , Option [C ], Int ) => (V , Array [M ])
141- ): RDD [(K , V )] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL )(compute)
144+ ): RDD [(K , V )] = run(sc, vertices, messages, combiner, numPartitions,
145+ DEFAULT_STORAGE_LEVEL )(compute)
142146
143- /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator ]] and the default [[org.apache.spark.HashPartitioner ]]*/
147+ /**
148+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator ]] and the
149+ * default [[org.apache.spark.HashPartitioner ]]
150+ */
144151 def run [K : Manifest , V <: Vertex : Manifest , M <: Message [K ] : Manifest , C : Manifest ](
145152 sc : SparkContext ,
146153 vertices : RDD [(K , V )],
@@ -158,7 +165,8 @@ object Bagel extends Logging {
158165 }
159166
160167 /**
161- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator ]], default [[org.apache.spark.HashPartitioner ]],
168+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator ]],
169+ * default [[org.apache.spark.HashPartitioner ]],
162170 * [[org.apache.spark.bagel.DefaultCombiner ]] and the default storage level
163171 */
164172 def run [K : Manifest , V <: Vertex : Manifest , M <: Message [K ] : Manifest ](
@@ -171,7 +179,8 @@ object Bagel extends Logging {
171179 ): RDD [(K , V )] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL )(compute)
172180
173181 /**
174- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator ]], the default [[org.apache.spark.HashPartitioner ]]
182+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator ]],
183+ * the default [[org.apache.spark.HashPartitioner ]]
175184 * and [[org.apache.spark.bagel.DefaultCombiner ]]
176185 */
177186 def run [K : Manifest , V <: Vertex : Manifest , M <: Message [K ] : Manifest ](
@@ -227,8 +236,9 @@ object Bagel extends Logging {
227236 })
228237
229238 numMsgs += newMsgs.size
230- if (newVert.active)
239+ if (newVert.active) {
231240 numActiveVerts += 1
241+ }
232242
233243 Some ((newVert, newMsgs))
234244 }.persist(storageLevel)
0 commit comments