diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index fa533a512d53b..fcfb0cb81b679 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -17,6 +17,8 @@ package org.apache.spark.graphx.lib +import scala.collection.mutable.ArrayBuffer + import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.graphx.PartitionStrategy._ @@ -146,6 +148,45 @@ object Analytics extends Logging { }.reduce(_ + _) / 3) sc.stop() + case "ShortestPaths" => + var numEPart = 4 + var partitionStrategy: Option[PartitionStrategy] = None + val landmarks: ArrayBuffer[VertexId] = ArrayBuffer.empty + var outFname = "" + options.foreach{ + case ("numEPart", v) => numEPart = v.toInt + case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + case ("landmark", v) => landmarks += v.toLong + case ("output", v) => outFname = v + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + println("======================================") + println("| Shortest Paths |") + println("======================================") + + if (landmarks.isEmpty) { + println("Specify landmarks by passing --landmark= one or more times.") + sys.exit(1) + } + + val sc = new SparkContext(host, "ShortestPaths(" + fname + ")", conf) + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + val distances = ShortestPaths.run(graph, landmarks) + + if (outFname.nonEmpty) { + println("Writing output in 'vidlandmarkdistance' format to " + outFname) + distances.vertices.flatMap { + case (vid: VertexId, d: ShortestPaths.SPMap) => d.iterator.map { + case (landmark: VertexId, distance: Int) => + "%d\t%d\t%d".format(vid, landmark, distance) + } + }.saveAsTextFile(outFname) + } + + sc.stop() + case _ => println("Invalid task type.") }