@@ -20,34 +20,39 @@ package org.apache.spark.graphx.lib
2020import org .apache .spark .graphx ._
2121import scala .reflect .ClassTag
2222
23+ /**
24+ * Computes shortest paths to the given set of landmark vertices, returning a graph where each
25+ * vertex attribute is a map containing the shortest-path distance to each reachable landmark.
26+ */
2327object ShortestPaths {
24- type SPMap = Map [VertexId , Int ] // map of landmarks -> minimum distance to landmark
28+ /** Stores a map from the vertex id of a landmark to the distance to that landmark. */
29+ type SPMap = Map [VertexId , Int ]
30+
2531 def SPMap (x : (VertexId , Int )* ) = Map (x : _* )
32+
2633 def increment (spmap : SPMap ): SPMap = spmap.map { case (v, d) => v -> (d + 1 ) }
34+
2735 def plus (spmap1 : SPMap , spmap2 : SPMap ): SPMap =
2836 (spmap1.keySet ++ spmap2.keySet).map {
2937 k => k -> math.min(spmap1.getOrElse(k, Int .MaxValue ), spmap2.getOrElse(k, Int .MaxValue ))
3038 }.toMap
3139
3240 /**
33- * Compute the shortest paths to each landmark for each vertex and
34- * return an RDD with the map of landmarks to their shortest-path
35- * lengths.
41+ * Computes shortest paths to the given set of landmark vertices.
3642 *
3743 * @tparam ED the edge attribute type (not used in the computation)
3844 *
3945 * @param graph the graph for which to compute the shortest paths
40- * @param landmarks the list of landmark vertex ids
46+ * @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each
47+ * landmark.
4148 *
42- * @return a graph with vertex attributes containing a map of the
43- * shortest paths to each landmark
49+ * @return a graph where each vertex attribute is a map containing the shortest-path distance to
50+ * each reachable landmark vertex.
4451 */
4552 def run [ED : ClassTag ](graph : Graph [_, ED ], landmarks : Seq [VertexId ]): Graph [SPMap , ED ] = {
46- val spGraph = graph
47- .mapVertices { (vid, attr) =>
48- if (landmarks.contains(vid)) SPMap (vid -> 0 )
49- else SPMap ()
50- }
53+ val spGraph = graph.mapVertices { (vid, attr) =>
54+ if (landmarks.contains(vid)) SPMap (vid -> 0 ) else SPMap ()
55+ }
5156
5257 val initialMessage = SPMap ()
5358
@@ -61,12 +66,7 @@ object ShortestPaths {
6166 else Iterator .empty
6267 }
6368
64- def messageCombiner (s1 : SPMap , s2 : SPMap ): SPMap = {
65- plus(s1, s2)
66- }
67-
6869 Pregel (spGraph, initialMessage)(
69- vertexProgram, sendMessage, messageCombiner )
70+ vertexProgram, sendMessage, plus )
7071 }
71-
7272}
0 commit comments