From e68588e3ab325a5263228424bf703d41999bd01d Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 26 Jun 2014 00:39:13 -0700 Subject: [PATCH 1/2] adding simple chain test --- .../spark/graphx/lib/PageRankSuite.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index fc491ae327c2a..9e127dbe69829 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -67,6 +67,24 @@ class PageRankSuite extends FunSuite with LocalSparkContext { .map { case (id, error) => error }.sum } + test("Simple Chain") { + withSpark { sc => + val chain1 = Array((0, 1), (1, 2), (2, 3), (4, 3)) + val rawEdges = sc.parallelize(chain1, 1).map { case (s, d) => (s.toLong, d.toLong)} + val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 10 + val errorTol = 1.0e-5 + val vertices = chain.staticPageRank(numIter, resetProb).vertices.collect + val normalizer = vertices.map { case (id, p) => p }.sum + val normalizedRanks = vertices.map { case (id, p) => (id, p / normalizer) } + normalizedRanks.foreach(println(_)) + // val dynamicRanks = chain.pageRank(tol, resetProb).vertices.collect + } + } + + test("Star PageRank") { withSpark { sc => val nVertices = 100 @@ -96,7 +114,6 @@ class PageRankSuite extends FunSuite with LocalSparkContext { } // end of test Star PageRank - test("Grid PageRank") { withSpark { sc => val rows = 10 From 849848311f4b781a3e62345aeb83a8f8f0837c03 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Thu, 26 Jun 2014 01:14:54 -0700 Subject: [PATCH 2/2] improving test quality --- .../spark/graphx/lib/PageRankSuite.scala | 174 ++++++++++++------ 1 file changed, 116 insertions(+), 58 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 9e127dbe69829..3d04346d4f067 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -17,8 +17,9 @@ package org.apache.spark.graphx.lib -import org.scalatest.FunSuite +import org.scalatest.{Matchers, FunSuite} +import collection.mutable.MutableList import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.graphx._ @@ -26,9 +27,12 @@ import org.apache.spark.graphx.lib._ import org.apache.spark.graphx.util.GraphGenerators import org.apache.spark.rdd._ -object GridPageRank { - def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { - val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) +import scala.collection.mutable + +object TruePageRank { + + def grid(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { + val inNbrs = Array.fill(nRows * nCols)(mutable.MutableList.empty[Int]) val outDegree = Array.fill(nRows * nCols)(0) // Convert row column address into vertex ids (row major order) def sub2ind(r: Int, c: Int): Int = r * nCols + c @@ -44,110 +48,164 @@ object GridPageRank { inNbrs(sub2ind(r,c+1)) += ind } } + val vids = 0 until (nRows * nCols) + pr(vids.zip(inNbrs).toMap, vids.zip(outDegree).toMap, nIter, resetProb) + } + + def edges(edges: Array[(Int, Int)], nIter: Int, resetProb: Double) = { + val vids = edges.flatMap { case (s,d) => Iterator(s,d) }.toSet + val inNbrs = vids.map(id => (id, mutable.MutableList.empty[Int])).toMap + val outDegree = mutable.Map.empty[Int, Int] + for ((s,d) <- edges) { + outDegree(s) = outDegree.getOrElse(s,0) + 1 + inNbrs(d) += s + } + pr(inNbrs, outDegree, nIter, resetProb) + } + + def pr(inNbrs: collection.Map[Int, mutable.MutableList[Int]], + outDegree: collection.Map[Int, Int], nIter: Int, resetProb: Double) = { + val nVerts = inNbrs.size // compute the pagerank - var pr = Array.fill(nRows * nCols)(resetProb) + var pr = Array.fill(nVerts)(resetProb) for (iter <- 0 until nIter) { val oldPr = pr - pr = new Array[Double](nRows * nCols) - for (ind <- 0 until (nRows * nCols)) { + pr = new Array[Double](nVerts) + for (ind <- 0 until nVerts) { pr(ind) = resetProb + (1.0 - resetProb) * inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum } } - (0L until (nRows * nCols)).zip(pr) + val normalizer = pr.sum + (0L until nVerts).zip(pr.map(p => p / normalizer)) } } -class PageRankSuite extends FunSuite with LocalSparkContext { +class PageRankSuite extends FunSuite with LocalSparkContext with Matchers { def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } .map { case (id, error) => error }.sum } - test("Simple Chain") { + def normalizePr(pr: Array[(VertexId, Double)]) = { + val normalizer = pr.map { case (v, p) => p }.sum + pr.map { case (v, p) => (v, p / normalizer) } + } + + test("Simple Chain Static PageRank") { withSpark { sc => - val chain1 = Array((0, 1), (1, 2), (2, 3), (4, 3)) - val rawEdges = sc.parallelize(chain1, 1).map { case (s, d) => (s.toLong, d.toLong)} + val chainEdges = Array((0, 1), (1, 2), (2, 3), (4, 3)) + val rawEdges = sc.parallelize(chainEdges, 1).map { case (s, d) => (s.toLong, d.toLong)} val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() val resetProb = 0.15 val tol = 0.0001 val numIter = 10 - val errorTol = 1.0e-5 - val vertices = chain.staticPageRank(numIter, resetProb).vertices.collect - val normalizer = vertices.map { case (id, p) => p }.sum - val normalizedRanks = vertices.map { case (id, p) => (id, p / normalizer) } - normalizedRanks.foreach(println(_)) - // val dynamicRanks = chain.pageRank(tol, resetProb).vertices.collect + val truePr = TruePageRank.edges(chainEdges, numIter, resetProb).toMap + val vertices = normalizePr(chain.staticPageRank(numIter, resetProb).vertices.collect) + val pageranks = vertices.toMap + for (i <- 0 until 4) { + pageranks(i) should equal(truePr(i) +- 1.0e-7) + } } } - test("Star PageRank") { + test("Simple Chain Dynamic PageRank") { withSpark { sc => - val nVertices = 100 - val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() + val chainEdges = Array((0, 1), (1, 2), (2, 3), (4, 3)) + val rawEdges = sc.parallelize(chainEdges, 1).map { case (s, d) => (s.toLong, d.toLong)} + val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() val resetProb = 0.15 - val errorTol = 1.0e-5 - - val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices - val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache() + val tol = 0.0001 + val numIter = 10 + val truePr = TruePageRank.edges(chainEdges, numIter, resetProb).toMap + val vertices = normalizePr(chain.pageRank(1.0e-10, resetProb).vertices.collect) + val pageranks = vertices.toMap + for (i <- 0 until 4) { + pageranks(i) should equal(truePr(i) +- 1.0e-7) + } + } + } - // Static PageRank should only take 2 iterations to converge - val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => - if (pr1 != pr2) 1 else 0 - }.map { case (vid, test) => test }.sum - assert(notMatching === 0) - val staticErrors = staticRanks2.map { case (vid, pr) => - val correct = (vid > 0 && pr == resetProb) || - (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) - if (!correct) 1 else 0 + test("Static Star PageRank") { + withSpark { sc => + val nVertices = 10 + val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() + val resetProb = 0.15 + val staticRanks: VertexRDD[Double] = starGraph.staticPageRank(numIter = 2, resetProb).vertices + // Check the static pagerank + val pageranks: Map[VertexId, Double] = normalizePr(staticRanks.collect()).toMap + val perimeter = (nVertices - 1.0) * resetProb + val center = resetProb + (1.0 - resetProb) * perimeter + val normalizer = perimeter + center + pageranks(0) should equal((center / normalizer) +- 1.0e-7) + for (i <- 1 until nVertices) { + pageranks(i) should equal((resetProb / normalizer) +- 1.0e-7) } - assert(staticErrors.sum === 0) + } + } + - val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache() - assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) + test("Dynamic Star PageRank") { + withSpark { sc => + val nVertices = 10 + val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() + val resetProb = 0.15 + val dynamicRanks: VertexRDD[Double] = starGraph.pageRank(1.0e-10, resetProb).vertices + // Check the pagerank values + val pageranks: Map[VertexId, Double] = normalizePr(dynamicRanks.collect()).toMap + val perimeter = (nVertices - 1.0) * resetProb + val center = resetProb + (1.0 - resetProb) * perimeter + val normalizer = perimeter + center + pageranks(0) should equal ((center / normalizer) +- 1.0e-7) + for(i <- 1 until nVertices) { + pageranks(i) should equal ((resetProb / normalizer) +- 1.0e-7) + } } } // end of test Star PageRank - test("Grid PageRank") { + test("Grid Static PageRank") { withSpark { sc => - val rows = 10 - val cols = 10 + val rows = 5 + val cols = 5 val resetProb = 0.15 val tol = 0.0001 - val numIter = 50 + val numIter = 20 val errorTol = 1.0e-5 + val truePr = TruePageRank.grid(rows, cols, numIter, resetProb).toMap val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() - - val staticRanks = gridGraph.staticPageRank(numIter, resetProb).vertices.cache() - val dynamicRanks = gridGraph.pageRank(tol, resetProb).vertices.cache() - val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache() - - assert(compareRanks(staticRanks, referenceRanks) < errorTol) - assert(compareRanks(dynamicRanks, referenceRanks) < errorTol) + val vertices = gridGraph.staticPageRank(numIter, resetProb).vertices.cache() + val pageranks: Map[VertexId, Double] = normalizePr(vertices.collect()).toMap + for ((k,pr) <- truePr) { + pageranks(k) should equal ( pr +- 1.0e-5) + } } } // end of Grid PageRank - test("Chain PageRank") { + test("Grid Dynamic PageRank") { withSpark { sc => - val chain1 = (0 until 9).map(x => (x, x+1) ) - val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) } - val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val rows = 5 + val cols = 5 val resetProb = 0.15 val tol = 0.0001 - val numIter = 10 + val numIter = 20 val errorTol = 1.0e-5 + val truePr = TruePageRank.grid(rows, cols, numIter, resetProb).toMap + val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() + val vertices = gridGraph.pageRank(1.0e-10, resetProb).vertices.cache() + val pageranks: Map[VertexId, Double] = normalizePr(vertices.collect()).toMap + // vertices.collect.foreach(println(_)) + for ((k,pr) <- truePr) { + pageranks(k) should equal ( pr +- 1.0e-5) + } + } + } // end of Grid PageRank - val staticRanks = chain.staticPageRank(numIter, resetProb).vertices - val dynamicRanks = chain.pageRank(tol, resetProb).vertices - assert(compareRanks(staticRanks, dynamicRanks) < errorTol) - } - } }