Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.BoundedPriorityQueue

/**
* Model representing the result of matrix factorization.
Expand Down Expand Up @@ -274,46 +275,64 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * n)
var k = 0
ratings.foreachActive { (i, j, r) =>
output(k) = (srcIds(i), (dstIds(j), r))
k += 1
val srcBlocks = blockify(srcFeatures)
val dstBlocks = blockify(dstFeatures)
/**
* The previous approach used for computing top-k recommendations aimed to group
* individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could
* be used for efficiency. However, this causes excessive GC pressure due to the large
* arrays required for intermediate result storage, as well as a high sensitivity to the
* block size used.
* The following approach still groups factors into blocks, but instead computes the
* top-k elements per block, using a simple dot product (instead of gemm) and an efficient
* [[BoundedPriorityQueue]]. This avoids any large intermediate data structures and results
* in significantly reduced GC pressure as well as shuffle data, which far outweighs
* any cost incurred from not using Level 3 BLAS operations.
*/
val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) =>
val m = srcIter.size
val n = math.min(dstIter.size, num)
val output = new Array[(Int, (Int, Double))](m * n)
var j = 0
val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
srcIter.foreach { case (srcId, srcFactor) =>
dstIter.foreach { case (dstId, dstFactor) =>
/*
* The below code is equivalent to
* `val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)`
* This handwritten version is as or more efficient as BLAS calls in this case.
*/
var score: Double = 0
var k = 0
while (k < rank) {
score += srcFactor(k) * dstFactor(k)
k += 1
}
pq += dstId -> score
}
val pqIter = pq.iterator
var i = 0
while (i < n) {
output(j + i) = (srcId, pqIter.next())
i += 1
}
output.toSeq
j += n
pq.clear()
}
output.toSeq
}
ratings.topByKey(num)(Ordering.by(_._2))
}

/**
* Blockifies features to use Level-3 BLAS.
* Blockifies features to improve the efficiency of cartesian product
* TODO: SPARK-20443 - expose blockSize as a param?
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should adjust the comment here as we're not using Level-3 BLAS any more.

private def blockify(
rank: Int,
features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
val blockSize = 4096 // TODO: tune the block size
val blockStorage = rank * blockSize
features: RDD[(Int, Array[Double])],
blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = {
features.mapPartitions { iter =>
iter.grouped(blockSize).map { grouped =>
val ids = mutable.ArrayBuilder.make[Int]
ids.sizeHint(blockSize)
val factors = mutable.ArrayBuilder.make[Double]
factors.sizeHint(blockStorage)
var i = 0
grouped.foreach { case (id, factor) =>
ids += id
factors ++= factor
i += 1
}
(ids.result(), new DenseMatrix(rank, i, factors.result()))
}
iter.grouped(blockSize)
}
}

Expand Down