Skip to content

Commit b8d2580

Browse files
nevillelyhmengxr
authored andcommitted
[MLLIB] set RDD names in ALS
This is very useful when debugging & fine tuning jobs with large data sets. Author: Neville Li <[email protected]> Closes apache#966 from nevillelyh/master and squashes the following commits: 6747764 [Neville Li] [MLLIB] use string interpolation for RDD names 3b15d34 [Neville Li] [MLLIB] set RDD names in ALS
1 parent c402a4a commit b8d2580

File tree

1 file changed

+11
-5
lines changed
  • mllib/src/main/scala/org/apache/spark/mllib/recommendation

1 file changed

+11
-5
lines changed

mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ class ALS private (
201201
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner)
202202
val (productInLinks, productOutLinks) =
203203
makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner)
204+
userInLinks.setName("userInLinks")
205+
userOutLinks.setName("userOutLinks")
206+
productInLinks.setName("productInLinks")
207+
productOutLinks.setName("productOutLinks")
204208

205209
// Initialize user and product factors randomly, but use a deterministic seed for each
206210
// partition so that fault recovery works
@@ -225,14 +229,14 @@ class ALS private (
225229
// perform ALS update
226230
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
227231
// Persist users because it will be called twice.
228-
users.persist()
232+
users.setName(s"users-$iter").persist()
229233
val YtY = Some(sc.broadcast(computeYtY(users)))
230234
val previousProducts = products
231235
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
232236
alpha, YtY)
233237
previousProducts.unpersist()
234238
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
235-
products.persist()
239+
products.setName(s"products-$iter").persist()
236240
val XtX = Some(sc.broadcast(computeYtY(products)))
237241
val previousUsers = users
238242
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
@@ -245,22 +249,24 @@ class ALS private (
245249
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
246250
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
247251
alpha, YtY = None)
252+
products.setName(s"products-$iter")
248253
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
249254
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
250255
alpha, YtY = None)
256+
users.setName(s"users-$iter")
251257
}
252258
}
253259

254260
// The last `products` will be used twice. One to generate the last `users` and the other to
255261
// generate `productsOut`. So we cache it for better performance.
256-
products.persist()
262+
products.setName("products").persist()
257263

258264
// Flatten and cache the two final RDDs to un-block them
259265
val usersOut = unblockFactors(users, userOutLinks)
260266
val productsOut = unblockFactors(products, productOutLinks)
261267

262-
usersOut.persist()
263-
productsOut.persist()
268+
usersOut.setName("usersOut").persist()
269+
productsOut.setName("productsOut").persist()
264270

265271
// Materialize usersOut and productsOut.
266272
usersOut.count()

0 commit comments

Comments
 (0)