diff --git a/graphx/pom.xml b/graphx/pom.xml
index 987b831021a54..cfc259d96cb7e 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -70,6 +70,27 @@
org.apache.spark
spark-test-tags_${scala.binary.version}
+
+ org.scalanlp
+ breeze_${scala.binary.version}
+ 0.11.2
+
+
+
+ junit
+ junit
+
+
+ org.apache.commons
+ commons-math3
+
+
+
+
+ org.apache.commons
+ commons-math3
+
target/scala-${scala.binary.version}/classes
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 9451ff1e5c0e2..e97b1ceb29c08 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -26,6 +26,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.lib._
+import breeze.linalg.{SparseVector => BSV}
+
/**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
* efficient GraphX API. This class is implicitly constructed for each Graph object.
@@ -384,6 +386,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
PageRank.runUntilConvergenceWithOptions(graph, tol, resetProb, Some(src))
}
+ /**
+ * Run parallel personalized PageRank for a given array of source vertices, such
+ * that all random walks are started relative to the source vertices
+ */
+ def staticParallelPersonalizedPageRank(sources: Array[VertexId], numIter: Int,
+ resetProb: Double = 0.15) : Graph[BSV[Double], Double] = {
+ PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources)
+ }
+
/**
* Run Personalized PageRank for a fixed number of iterations with
* with all iterations originating at the source node
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 52b237fc15093..cd34061415ec9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -23,6 +23,8 @@ import scala.language.postfixOps
import org.apache.spark.Logging
import org.apache.spark.graphx._
+import breeze.linalg.{SparseVector => BSV}
+
/**
* PageRank algorithm implementation. There are two implementations of PageRank implemented.
*
@@ -158,6 +160,82 @@ object PageRank extends Logging {
rankGraph
}
+ /**
+ * Run Personalized PageRank for a fixed number of iterations, for a
+ * set of starting nodes in parallel. Returns a graph with vertex attributes
+ * containing the pagerank relative to all starting nodes (as a sparse vector) and
+ * edge attributes the normalized edge weight
+ *
+ * @tparam VD The original vertex attribute (not used)
+ * @tparam ED The original edge attribute (not used)
+ *
+ * @param graph The graph on which to compute personalized pagerank
+ * @param numIter The number of iterations to run
+ * @param resetProb The random reset probability
+ * @param sources The list of sources to compute personalized pagerank from
+ * @return the graph with vertex attributes
+ * containing the pagerank relative to all starting nodes (as a sparse vector) and
+ * edge attributes the normalized edge weight
+ */
+ def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
+ numIter: Int, resetProb: Double = 0.15,
+ sources: Array[VertexId]): Graph[BSV[Double], Double] = {
+ // TODO if one sources vertex id is outside of the int range
+ // we won't be able to store its activations in a sparse vector
+ val zero = new BSV[Double](Array(), Array(), sources.size)
+ val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
+ val v = new BSV[Double](Array(i), Array(resetProb), sources.size)
+ (vid, v)
+ }.toMap
+ val sc = graph.vertices.sparkContext
+ val sourcesInitMapBC = sc.broadcast(sourcesInitMap)
+ // Initialize the PageRank graph with each edge attribute having
+ // weight 1/outDegree and each source vertex with attribute 1.0.
+ var rankGraph = graph
+ // Associate the degree with each vertex
+ .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
+ // Set the weight on the edges based on the degree
+ .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src)
+ .mapVertices { (vid, attr) =>
+ if (sourcesInitMapBC.value contains vid) {
+ sourcesInitMapBC.value(vid)
+ } else {
+ zero
+ }
+ }
+
+ var i = 0
+ while (i < numIter) {
+ val prevRankGraph = rankGraph
+ // Propagates the message along outbound edges
+ // and adding start nodes back in with activation resetProb
+ val rankUpdates = rankGraph.aggregateMessages[BSV[Double]](
+ ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
+ (a: BSV[Double], b: BSV[Double]) => a :+ b, TripletFields.Src)
+
+ rankGraph = rankGraph.joinVertices(rankUpdates) {
+ (vid, oldRank, msgSum) =>
+ val popActivations: BSV[Double] = msgSum :* (1.0 - resetProb)
+ val resetActivations = if (sourcesInitMapBC.value contains vid) {
+ sourcesInitMapBC.value(vid)
+ } else {
+ zero
+ }
+ popActivations :+ resetActivations
+ }.cache()
+
+ rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
+ prevRankGraph.vertices.unpersist(false)
+ prevRankGraph.edges.unpersist(false)
+
+ logInfo(s"Parallel Personalized PageRank finished iteration $i.")
+
+ i += 1
+ }
+
+ rankGraph
+ }
+
/**
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
* PageRank and edge attributes containing the normalized edge weight.
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index bdff31446f8ee..b6305c8d00aba 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -118,11 +118,29 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache()
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+ val parallelStaticRanks1 = starGraph
+ .staticParallelPersonalizedPageRank(Array(0), 1, resetProb).mapVertices {
+ case (vertexId, vector) => vector(0)
+ }.vertices.cache()
+ assert(compareRanks(staticRanks1, parallelStaticRanks1) < errorTol)
+
+ val parallelStaticRanks2 = starGraph
+ .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
+ case (vertexId, vector) => vector(0)
+ }.vertices.cache()
+ assert(compareRanks(staticRanks2, parallelStaticRanks2) < errorTol)
+
// We have one outbound edge from 1 to 0
val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb)
.vertices.cache()
val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
+ val otherParallelStaticRanks2 = starGraph
+ .staticParallelPersonalizedPageRank(Array(0, 1), 2, resetProb).mapVertices {
+ case (vertexId, vector) => vector(1)
+ }.vertices.cache()
assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
+ assert(compareRanks(otherStaticRanks2, otherParallelStaticRanks2) < errorTol)
+ assert(compareRanks(otherDynamicRanks, otherParallelStaticRanks2) < errorTol)
}
} // end of test Star PersonalPageRank
@@ -177,6 +195,12 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+
+ val parallelStaticRanks = chain
+ .staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices {
+ case (vertexId, vector) => vector(0)
+ }.vertices.cache()
+ assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
}
}
}
diff --git a/mllib/pom.xml b/mllib/pom.xml
index 70139121d8c78..712603ede5a19 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -68,23 +68,6 @@
${jblas.version}
test
-
- org.scalanlp
- breeze_${scala.binary.version}
- 0.11.2
-
-
-
- junit
- junit
-
-
- org.apache.commons
- commons-math3
-
-
-
org.apache.commons
commons-math3