1818package org .apache .spark .graphx .lib
1919
2020import org .apache .spark .graphx ._
21+ import scala .reflect .ClassTag
2122
2223object ShortestPaths {
2324 type SPMap = Map [VertexId , Int ] // map of landmarks -> minimum distance to landmark
@@ -33,33 +34,28 @@ object ShortestPaths {
3334 * return an RDD with the map of landmarks to their shortest-path
3435 * lengths.
3536 *
36- * @tparam VD the shortest paths map for the vertex
37- * @tparam ED the incremented shortest-paths map of the originating
38- * vertex (discarded in the computation)
37+ * @tparam ED the edge attribute type (not used in the computation)
3938 *
4039 * @param graph the graph for which to compute the shortest paths
4140 * @param landmarks the list of landmark vertex ids
4241 *
4342 * @return a graph with vertex attributes containing a map of the
4443 * shortest paths to each landmark
4544 */
46- def run [VD , ED ](graph : Graph [VD , ED ], landmarks : Seq [VertexId ])
47- (implicit m1 : Manifest [VD ], m2 : Manifest [ED ]): Graph [SPMap , SPMap ] = {
48-
45+ def run [ED : ClassTag ](graph : Graph [_, ED ], landmarks : Seq [VertexId ]): Graph [SPMap , ED ] = {
4946 val spGraph = graph
5047 .mapVertices { (vid, attr) =>
5148 if (landmarks.contains(vid)) SPMap (vid -> 0 )
5249 else SPMap ()
5350 }
54- .mapTriplets(edge => edge.srcAttr)
5551
5652 val initialMessage = SPMap ()
5753
5854 def vertexProgram (id : VertexId , attr : SPMap , msg : SPMap ): SPMap = {
5955 plus(attr, msg)
6056 }
6157
62- def sendMessage (edge : EdgeTriplet [SPMap , SPMap ]): Iterator [(VertexId , SPMap )] = {
58+ def sendMessage (edge : EdgeTriplet [SPMap , _ ]): Iterator [(VertexId , SPMap )] = {
6359 val newAttr = increment(edge.srcAttr)
6460 if (edge.dstAttr != plus(newAttr, edge.dstAttr)) Iterator ((edge.dstId, newAttr))
6561 else Iterator .empty
0 commit comments