Skip to content

Commit 9535f40

Browse files
ankurdaverxin
authored andcommitted
Add landmark-based Shortest Path algorithm to graphx.lib
This is a modified version of apache#10. Author: Ankur Dave <[email protected]> Author: Andres Perez <[email protected]> Closes apache#933 from ankurdave/shortestpaths and squashes the following commits: 03a103c [Ankur Dave] Style fixes 7a1ff48 [Ankur Dave] Improve ShortestPaths documentation d75c8fc [Ankur Dave] Remove unnecessary VD type param, and pass through ED d983fb4 [Ankur Dave] Fix style errors 60ed8e6 [Andres Perez] Add Shortest-path computations to graphx.lib with unit tests.
1 parent d17d221 commit 9535f40

File tree

2 files changed

+120
-0
lines changed

2 files changed

+120
-0
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.graphx.lib
19+
20+
import org.apache.spark.graphx._
21+
import scala.reflect.ClassTag
22+
23+
/**
24+
* Computes shortest paths to the given set of landmark vertices, returning a graph where each
25+
* vertex attribute is a map containing the shortest-path distance to each reachable landmark.
26+
*/
27+
object ShortestPaths {
28+
/** Stores a map from the vertex id of a landmark to the distance to that landmark. */
29+
type SPMap = Map[VertexId, Int]
30+
31+
private def makeMap(x: (VertexId, Int)*) = Map(x: _*)
32+
33+
private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
34+
35+
private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap =
36+
(spmap1.keySet ++ spmap2.keySet).map {
37+
k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
38+
}.toMap
39+
40+
/**
41+
* Computes shortest paths to the given set of landmark vertices.
42+
*
43+
* @tparam ED the edge attribute type (not used in the computation)
44+
*
45+
* @param graph the graph for which to compute the shortest paths
46+
* @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each
47+
* landmark.
48+
*
49+
* @return a graph where each vertex attribute is a map containing the shortest-path distance to
50+
* each reachable landmark vertex.
51+
*/
52+
def run[ED: ClassTag](graph: Graph[_, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
53+
val spGraph = graph.mapVertices { (vid, attr) =>
54+
if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()
55+
}
56+
57+
val initialMessage = makeMap()
58+
59+
def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
60+
addMaps(attr, msg)
61+
}
62+
63+
def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
64+
val newAttr = incrementMap(edge.srcAttr)
65+
if (edge.dstAttr != addMaps(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr))
66+
else Iterator.empty
67+
}
68+
69+
Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
70+
}
71+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.graphx.lib
19+
20+
import org.scalatest.FunSuite
21+
22+
import org.apache.spark.SparkContext
23+
import org.apache.spark.SparkContext._
24+
import org.apache.spark.graphx._
25+
import org.apache.spark.graphx.lib._
26+
import org.apache.spark.graphx.util.GraphGenerators
27+
import org.apache.spark.rdd._
28+
29+
class ShortestPathsSuite extends FunSuite with LocalSparkContext {
30+
31+
test("Shortest Path Computations") {
32+
withSpark { sc =>
33+
val shortestPaths = Set(
34+
(1, Map(1 -> 0, 4 -> 2)), (2, Map(1 -> 1, 4 -> 2)), (3, Map(1 -> 2, 4 -> 1)),
35+
(4, Map(1 -> 2, 4 -> 0)), (5, Map(1 -> 1, 4 -> 1)), (6, Map(1 -> 3, 4 -> 1)))
36+
val edgeSeq = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)).flatMap {
37+
case e => Seq(e, e.swap)
38+
}
39+
val edges = sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) }
40+
val graph = Graph.fromEdgeTuples(edges, 1)
41+
val landmarks = Seq(1, 4).map(_.toLong)
42+
val results = ShortestPaths.run(graph, landmarks).vertices.collect.map {
43+
case (v, spMap) => (v, spMap.mapValues(_.get))
44+
}
45+
assert(results.toSet === shortestPaths)
46+
}
47+
}
48+
49+
}

0 commit comments

Comments
 (0)