Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 124 additions & 49 deletions graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@

package org.apache.spark.graphx.lib

import org.scalatest.FunSuite

import org.scalatest.{Matchers, FunSuite}
import collection.mutable.MutableList
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._

object GridPageRank {
def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
import scala.collection.mutable

object TruePageRank {

def grid(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
val inNbrs = Array.fill(nRows * nCols)(mutable.MutableList.empty[Int])
val outDegree = Array.fill(nRows * nCols)(0)
// Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): Int = r * nCols + c
Expand All @@ -44,93 +48,164 @@ object GridPageRank {
inNbrs(sub2ind(r,c+1)) += ind
}
}
val vids = 0 until (nRows * nCols)
pr(vids.zip(inNbrs).toMap, vids.zip(outDegree).toMap, nIter, resetProb)
}

def edges(edges: Array[(Int, Int)], nIter: Int, resetProb: Double) = {
val vids = edges.flatMap { case (s,d) => Iterator(s,d) }.toSet
val inNbrs = vids.map(id => (id, mutable.MutableList.empty[Int])).toMap
val outDegree = mutable.Map.empty[Int, Int]
for ((s,d) <- edges) {
outDegree(s) = outDegree.getOrElse(s,0) + 1
inNbrs(d) += s
}
pr(inNbrs, outDegree, nIter, resetProb)
}

def pr(inNbrs: collection.Map[Int, mutable.MutableList[Int]],
outDegree: collection.Map[Int, Int], nIter: Int, resetProb: Double) = {
val nVerts = inNbrs.size
// compute the pagerank
var pr = Array.fill(nRows * nCols)(resetProb)
var pr = Array.fill(nVerts)(resetProb)
for (iter <- 0 until nIter) {
val oldPr = pr
pr = new Array[Double](nRows * nCols)
for (ind <- 0 until (nRows * nCols)) {
pr = new Array[Double](nVerts)
for (ind <- 0 until nVerts) {
pr(ind) = resetProb + (1.0 - resetProb) *
inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
}
}
(0L until (nRows * nCols)).zip(pr)
val normalizer = pr.sum
(0L until nVerts).zip(pr.map(p => p / normalizer))
}

}


class PageRankSuite extends FunSuite with LocalSparkContext {
class PageRankSuite extends FunSuite with LocalSparkContext with Matchers {

def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = {
a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
.map { case (id, error) => error }.sum
}

test("Star PageRank") {
def normalizePr(pr: Array[(VertexId, Double)]) = {
val normalizer = pr.map { case (v, p) => p }.sum
pr.map { case (v, p) => (v, p / normalizer) }
}

test("Simple Chain Static PageRank") {
withSpark { sc =>
val nVertices = 100
val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
val chainEdges = Array((0, 1), (1, 2), (2, 3), (4, 3))
val rawEdges = sc.parallelize(chainEdges, 1).map { case (s, d) => (s.toLong, d.toLong)}
val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
val resetProb = 0.15
val errorTol = 1.0e-5
val tol = 0.0001
val numIter = 10
val truePr = TruePageRank.edges(chainEdges, numIter, resetProb).toMap
val vertices = normalizePr(chain.staticPageRank(numIter, resetProb).vertices.collect)
val pageranks = vertices.toMap
for (i <- 0 until 4) {
pageranks(i) should equal(truePr(i) +- 1.0e-7)
}
}
}


val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices
val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache()
test("Simple Chain Dynamic PageRank") {
withSpark { sc =>
val chainEdges = Array((0, 1), (1, 2), (2, 3), (4, 3))
val rawEdges = sc.parallelize(chainEdges, 1).map { case (s, d) => (s.toLong, d.toLong)}
val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
val resetProb = 0.15
val tol = 0.0001
val numIter = 10
val truePr = TruePageRank.edges(chainEdges, numIter, resetProb).toMap
val vertices = normalizePr(chain.pageRank(1.0e-10, resetProb).vertices.collect)
val pageranks = vertices.toMap
for (i <- 0 until 4) {
pageranks(i) should equal(truePr(i) +- 1.0e-7)
}
}
}

// Static PageRank should only take 2 iterations to converge
val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
if (pr1 != pr2) 1 else 0
}.map { case (vid, test) => test }.sum
assert(notMatching === 0)

val staticErrors = staticRanks2.map { case (vid, pr) =>
val correct = (vid > 0 && pr == resetProb) ||
(vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
if (!correct) 1 else 0
test("Static Star PageRank") {
withSpark { sc =>
val nVertices = 10
val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
val resetProb = 0.15
val staticRanks: VertexRDD[Double] = starGraph.staticPageRank(numIter = 2, resetProb).vertices
// Check the static pagerank
val pageranks: Map[VertexId, Double] = normalizePr(staticRanks.collect()).toMap
val perimeter = (nVertices - 1.0) * resetProb
val center = resetProb + (1.0 - resetProb) * perimeter
val normalizer = perimeter + center
pageranks(0) should equal((center / normalizer) +- 1.0e-7)
for (i <- 1 until nVertices) {
pageranks(i) should equal((resetProb / normalizer) +- 1.0e-7)
}
assert(staticErrors.sum === 0)
}
}


val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache()
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
test("Dynamic Star PageRank") {
withSpark { sc =>
val nVertices = 10
val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
val resetProb = 0.15
val dynamicRanks: VertexRDD[Double] = starGraph.pageRank(1.0e-10, resetProb).vertices
// Check the pagerank values
val pageranks: Map[VertexId, Double] = normalizePr(dynamicRanks.collect()).toMap
val perimeter = (nVertices - 1.0) * resetProb
val center = resetProb + (1.0 - resetProb) * perimeter
val normalizer = perimeter + center
pageranks(0) should equal ((center / normalizer) +- 1.0e-7)
for(i <- 1 until nVertices) {
pageranks(i) should equal ((resetProb / normalizer) +- 1.0e-7)
}
}
} // end of test Star PageRank



test("Grid PageRank") {
test("Grid Static PageRank") {
withSpark { sc =>
val rows = 10
val cols = 10
val rows = 5
val cols = 5
val resetProb = 0.15
val tol = 0.0001
val numIter = 50
val numIter = 20
val errorTol = 1.0e-5
val truePr = TruePageRank.grid(rows, cols, numIter, resetProb).toMap
val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()

val staticRanks = gridGraph.staticPageRank(numIter, resetProb).vertices.cache()
val dynamicRanks = gridGraph.pageRank(tol, resetProb).vertices.cache()
val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache()

assert(compareRanks(staticRanks, referenceRanks) < errorTol)
assert(compareRanks(dynamicRanks, referenceRanks) < errorTol)
val vertices = gridGraph.staticPageRank(numIter, resetProb).vertices.cache()
val pageranks: Map[VertexId, Double] = normalizePr(vertices.collect()).toMap
for ((k,pr) <- truePr) {
pageranks(k) should equal ( pr +- 1.0e-5)
}
}
} // end of Grid PageRank


test("Chain PageRank") {
test("Grid Dynamic PageRank") {
withSpark { sc =>
val chain1 = (0 until 9).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
val rows = 5
val cols = 5
val resetProb = 0.15
val tol = 0.0001
val numIter = 10
val numIter = 20
val errorTol = 1.0e-5
val truePr = TruePageRank.grid(rows, cols, numIter, resetProb).toMap
val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()
val vertices = gridGraph.pageRank(1.0e-10, resetProb).vertices.cache()
val pageranks: Map[VertexId, Double] = normalizePr(vertices.collect()).toMap
// vertices.collect.foreach(println(_))
for ((k,pr) <- truePr) {
pageranks(k) should equal ( pr +- 1.0e-5)
}
}
} // end of Grid PageRank

val staticRanks = chain.staticPageRank(numIter, resetProb).vertices
val dynamicRanks = chain.pageRank(tol, resetProb).vertices

assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
}
}
}