Skip to content

Commit d7f2edc

Browse files
author
Yves Raimond
committed
Fixing SPARK-11432 - non-uniform initialization for personalized pagerank
1 parent bb5a2af commit d7f2edc

File tree

2 files changed

+31
-11
lines changed

2 files changed

+31
-11
lines changed

graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,17 +104,25 @@ object PageRank extends Logging {
104104
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
105105
srcId: Option[VertexId] = None): Graph[Double, Double] =
106106
{
107+
val personalized = srcId isDefined
107108
// Initialize the PageRank graph with each edge attribute having
108-
// weight 1/outDegree and each vertex with attribute 1.0.
109+
// weight 1/outDegree and each vertex with attribute resetProb.
110+
// When running personalized pagerank, only the source vertex
111+
// has an attribute resetProb. All others are set to 0.
109112
var rankGraph: Graph[Double, Double] = graph
110113
// Associate the degree with each vertex
111114
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
112115
// Set the weight on the edges based on the degree
113116
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
114117
// Set the vertex attributes to the initial pagerank values
115-
.mapVertices( (id, attr) => resetProb )
118+
.mapVertices( (id, attr) => {
119+
if (personalized) {
120+
if (id == srcId.get) resetProb else 0.0
121+
} else {
122+
resetProb
123+
}
124+
})
116125

117-
val personalized = srcId isDefined
118126
val src: VertexId = srcId.getOrElse(-1L)
119127
def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
120128

@@ -192,6 +200,7 @@ object PageRank extends Logging {
192200
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
193201
srcId: Option[VertexId] = None): Graph[Double, Double] =
194202
{
203+
val personalized = srcId.isDefined
195204
// Initialize the pagerankGraph with each edge attribute
196205
// having weight 1/outDegree and each vertex with attribute 1.0.
197206
val pagerankGraph: Graph[(Double, Double), Double] = graph
@@ -202,10 +211,15 @@ object PageRank extends Logging {
202211
// Set the weight on the edges based on the degree
203212
.mapTriplets( e => 1.0 / e.srcAttr )
204213
// Set the vertex attributes to (initalPR, delta = 0)
205-
.mapVertices( (id, attr) => (0.0, 0.0) )
214+
.mapVertices( (id, attr) => {
215+
if (personalized && id == srcId.get) {
216+
(resetProb, Double.NegativeInfinity)
217+
} else {
218+
(0.0, 0.0)
219+
}
220+
} )
206221
.cache()
207222

208-
val personalized = srcId.isDefined
209223
val src: VertexId = srcId.getOrElse(-1L)
210224

211225

@@ -225,7 +239,8 @@ object PageRank extends Logging {
225239
teleport = oldPR*delta
226240

227241
val newPR = teleport + (1.0 - resetProb) * msgSum
228-
(newPR, newPR - oldPR)
242+
val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR
243+
(newPR, newDelta)
229244
}
230245

231246
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
@@ -239,7 +254,7 @@ object PageRank extends Logging {
239254
def messageCombiner(a: Double, b: Double): Double = a + b
240255

241256
// The initial message received by all vertices in PageRank
242-
val initialMessage = resetProb / (1.0 - resetProb)
257+
val initialMessage = if (personalized) 0.0 else resetProb / (1.0 - resetProb)
243258

244259
// Execute a dynamic version of Pregel.
245260
val vp = if (personalized) {

graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,17 +109,22 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
109109
assert(notMatching === 0)
110110

111111
val staticErrors = staticRanks2.map { case (vid, pr) =>
112-
val correct = (vid > 0 && pr == resetProb) ||
113-
(vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb *
114-
(nVertices - 1)) )) < 1.0E-5)
112+
val correct = (vid > 0 && pr == 0.0) ||
113+
(vid == 0 && pr == resetProb)
115114
if (!correct) 1 else 0
116115
}
117116
assert(staticErrors.sum === 0)
118117

119118
val dynamicRanks = starGraph.personalizedPageRank(0, 0, resetProb).vertices.cache()
120119
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
120+
121+
// We have one outbound edge from 1 to 0
122+
val otherStaticRanks2 = starGraph.staticPersonalizedPageRank(1, numIter = 2, resetProb)
123+
.vertices.cache()
124+
val otherDynamicRanks = starGraph.personalizedPageRank(1, 0, resetProb).vertices.cache()
125+
assert(compareRanks(otherDynamicRanks, otherStaticRanks2) < errorTol)
121126
}
122-
} // end of test Star PageRank
127+
} // end of test Star PersonalPageRank
123128

124129
test("Grid PageRank") {
125130
withSpark { sc =>

0 commit comments

Comments
 (0)