Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ class ALS private (
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner)
val (productInLinks, productOutLinks) =
makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner)
userInLinks.setName("userInLinks")
userOutLinks.setName("userOutLinks")
productInLinks.setName("productInLinks")
productOutLinks.setName("productOutLinks")

// Initialize user and product factors randomly, but use a deterministic seed for each
// partition so that fault recovery works
Expand All @@ -225,14 +229,14 @@ class ALS private (
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
// Persist users because it will be called twice.
users.persist()
users.setName(s"users-$iter").persist()
val YtY = Some(sc.broadcast(computeYtY(users)))
val previousProducts = products
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY)
previousProducts.unpersist()
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
products.persist()
products.setName(s"products-$iter").persist()
val XtX = Some(sc.broadcast(computeYtY(products)))
val previousUsers = users
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
Expand All @@ -245,22 +249,24 @@ class ALS private (
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY = None)
products.setName(s"products-$iter")
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, YtY = None)
users.setName(s"users-$iter")
}
}

// The last `products` will be used twice. One to generate the last `users` and the other to
// generate `productsOut`. So we cache it for better performance.
products.persist()
products.setName("products").persist()

// Flatten and cache the two final RDDs to un-block them
val usersOut = unblockFactors(users, userOutLinks)
val productsOut = unblockFactors(products, productOutLinks)

usersOut.persist()
productsOut.persist()
usersOut.setName("usersOut").persist()
productsOut.setName("productsOut").persist()

// Materialize usersOut and productsOut.
usersOut.count()
Expand Down