From 14cdbf63e79ebcf2d1207c79b0b4ba73e15729b2 Mon Sep 17 00:00:00 2001 From: Peng Date: Mon, 24 Apr 2017 16:32:16 +0800 Subject: [PATCH 1/9] Optimize ALS recommendForAll --- .../MatrixFactorizationModel.scala | 55 ++++++++----------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 23045fa2b686..14abf9b08a09 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -17,17 +17,17 @@ package org.apache.spark.mllib.recommendation -import java.io.IOException -import java.lang.{Integer => JavaInteger} - -import scala.collection.mutable - +import breeze.linalg.min import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import com.github.fommil.netlib.BLAS.{getInstance => blas} +import java.io.IOException +import java.lang.{Integer => JavaInteger} import org.apache.hadoop.fs.Path import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ +import scala.collection.mutable +import scala.collection.mutable.PriorityQueue import org.apache.spark.SparkContext import org.apache.spark.annotation.Since @@ -277,17 +277,23 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { val srcBlocks = blockify(rank, srcFeatures) val dstBlocks = blockify(rank, dstFeatures) val ratings = srcBlocks.cartesian(dstBlocks).flatMap { - case ((srcIds, srcFactors), (dstIds, dstFactors)) => - val m = srcIds.length - val n = dstIds.length - val ratings = srcFactors.transpose.multiply(dstFactors) - val output = new Array[(Int, (Int, Double))](m * n) - var k = 0 - ratings.foreachActive { (i, j, r) => - output(k) = (srcIds(i), (dstIds(j), r)) - k += 1 - } - output.toSeq + case (users, items) => + val m = users.size + val n = min(items.size, num) + val output = new Array[(Int, (Int, Double))](m * n) + var j = 0 + users.foreach (user => { + def order(a: (Int, Double)) = a._2 + val pq: PriorityQueue[(Int, Double)] = PriorityQueue()(Ordering.by(order)) + items.foreach (item => { + val rate = blas.ddot(rank, user._2, 1, item._2, 1) + pq.enqueue((item._1, rate)) + }) + for(i <- 0 to n-1) + output(j + i) = (user._1, pq.dequeue()) + j += n + }) + output.toSeq } ratings.topByKey(num)(Ordering.by(_._2)) } @@ -297,23 +303,10 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { */ private def blockify( rank: Int, - features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = { + features: RDD[(Int, Array[Double])]): RDD[Seq[(Int, Array[Double])]] = { val blockSize = 4096 // TODO: tune the block size - val blockStorage = rank * blockSize features.mapPartitions { iter => - iter.grouped(blockSize).map { grouped => - val ids = mutable.ArrayBuilder.make[Int] - ids.sizeHint(blockSize) - val factors = mutable.ArrayBuilder.make[Double] - factors.sizeHint(blockStorage) - var i = 0 - grouped.foreach { case (id, factor) => - ids += id - factors ++= factor - i += 1 - } - (ids.result(), new DenseMatrix(rank, i, factors.result())) - } + iter.grouped(blockSize) } } From f607e6c441559c5d48cd4c9dac58960b98c2198a Mon Sep 17 00:00:00 2001 From: Peng Date: Mon, 24 Apr 2017 23:00:57 +0800 Subject: [PATCH 2/9] change breaze.min to math.min --- .../spark/mllib/recommendation/MatrixFactorizationModel.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 14abf9b08a09..ae3b44a1652d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -17,7 +17,6 @@ package org.apache.spark.mllib.recommendation -import breeze.linalg.min import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import com.github.fommil.netlib.BLAS.{getInstance => blas} import java.io.IOException @@ -279,7 +278,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (users, items) => val m = users.size - val n = min(items.size, num) + val n = math.min(items.size, num) val output = new Array[(Int, (Int, Double))](m * n) var j = 0 users.foreach (user => { From ae0312491e3d5a193626d4de1db7d7a101c7482d Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Wed, 26 Apr 2017 19:00:16 +0800 Subject: [PATCH 3/9] use BoundedProrityQueue to replace ProrityQueue --- .../MatrixFactorizationModel.scala | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index ae3b44a1652d..ea7b40d123d8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -17,16 +17,17 @@ package org.apache.spark.mllib.recommendation -import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus -import com.github.fommil.netlib.BLAS.{getInstance => blas} import java.io.IOException import java.lang.{Integer => JavaInteger} + +import scala.collection.mutable + +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus +import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.apache.hadoop.fs.Path import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -import scala.collection.mutable -import scala.collection.mutable.PriorityQueue import org.apache.spark.SparkContext import org.apache.spark.annotation.Since @@ -38,6 +39,7 @@ import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.BoundedPriorityQueue /** * Model representing the result of matrix factorization. @@ -283,13 +285,29 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { var j = 0 users.foreach (user => { def order(a: (Int, Double)) = a._2 - val pq: PriorityQueue[(Int, Double)] = PriorityQueue()(Ordering.by(order)) + val pq: BoundedPriorityQueue[(Int, Double)] = + new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(order)) items.foreach (item => { - val rate = blas.ddot(rank, user._2, 1, item._2, 1) - pq.enqueue((item._1, rate)) + /** + * blas.ddot (F2jBLAS) is the same performance with the following code. + * the performace of blas.ddot with NativeBLAS is very bad. + * blas.ddot (F2jBLAS) is about 10% improvement comparing with linalg.dot. + * val rate = blas.ddot(rank, user._2, 1, item._2, 1) + */ + var rate: Double = 0 + var k = 0 + while(k < rank) { + rate += user._2(k) * item._2(k) + k += 1 + } + pq += ((item._1, rate)) }) - for(i <- 0 to n-1) - output(j + i) = (user._1, pq.dequeue()) + val pqIter = pq.iterator + var i = 0 + while(i < n) { + output(j + i) = (user._1, pqIter.next()) + i += 1 + } j += n }) output.toSeq From 2fd97a043c9ca72a04012a4b4cef06ecc023917c Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Wed, 26 Apr 2017 20:56:21 +0800 Subject: [PATCH 4/9] change code format --- .../MatrixFactorizationModel.scala | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index ea7b40d123d8..4460746b9ab3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -277,39 +277,38 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { num: Int): RDD[(Int, Array[(Int, Double)])] = { val srcBlocks = blockify(rank, srcFeatures) val dstBlocks = blockify(rank, dstFeatures) - val ratings = srcBlocks.cartesian(dstBlocks).flatMap { - case (users, items) => - val m = users.size - val n = math.min(items.size, num) + val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => + val m = srcIter.size + val n = math.min(dstIter.size, num) val output = new Array[(Int, (Int, Double))](m * n) var j = 0 - users.foreach (user => { + srcIter.foreach { case (srcId, srcFactor) => def order(a: (Int, Double)) = a._2 val pq: BoundedPriorityQueue[(Int, Double)] = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(order)) - items.foreach (item => { + dstIter.foreach { case (dstId, dstFactor) => /** * blas.ddot (F2jBLAS) is the same performance with the following code. * the performace of blas.ddot with NativeBLAS is very bad. * blas.ddot (F2jBLAS) is about 10% improvement comparing with linalg.dot. * val rate = blas.ddot(rank, user._2, 1, item._2, 1) */ - var rate: Double = 0 + var score: Double = 0 var k = 0 - while(k < rank) { - rate += user._2(k) * item._2(k) + while (k < rank) { + score += srcFactor(k) * dstFactor(k) k += 1 } - pq += ((item._1, rate)) - }) + pq += ((dstId, score)) + } val pqIter = pq.iterator var i = 0 - while(i < n) { - output(j + i) = (user._1, pqIter.next()) + while (i < n) { + output(j + i) = (srcId, pqIter.next()) i += 1 } j += n - }) + } output.toSeq } ratings.topByKey(num)(Ordering.by(_._2)) From 206a023433805e8d55b0cb30eebde130b4245bf9 Mon Sep 17 00:00:00 2001 From: Peng Date: Fri, 28 Apr 2017 10:18:30 +0800 Subject: [PATCH 5/9] improve the comments of the code --- .../MatrixFactorizationModel.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 4460746b9ab3..c16278a86fac 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -277,21 +277,22 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { num: Int): RDD[(Int, Array[(Int, Double)])] = { val srcBlocks = blockify(rank, srcFeatures) val dstBlocks = blockify(rank, dstFeatures) + /** + * Use dot to replace blas 3 gemm is the key approach to improve efficiency. + * By this change, we can get the topK elements of each block to reduce the GC time. + * Comparing with BLAS.dot, hand writing dot is high efficiency. + */ val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => val m = srcIter.size val n = math.min(dstIter.size, num) val output = new Array[(Int, (Int, Double))](m * n) var j = 0 srcIter.foreach { case (srcId, srcFactor) => - def order(a: (Int, Double)) = a._2 - val pq: BoundedPriorityQueue[(Int, Double)] = - new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(order)) + val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) dstIter.foreach { case (dstId, dstFactor) => /** - * blas.ddot (F2jBLAS) is the same performance with the following code. - * the performace of blas.ddot with NativeBLAS is very bad. - * blas.ddot (F2jBLAS) is about 10% improvement comparing with linalg.dot. - * val rate = blas.ddot(rank, user._2, 1, item._2, 1) + * The below code is equivalent to + * val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1) */ var score: Double = 0 var k = 0 @@ -315,7 +316,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { } /** - * Blockifies features to use Level-3 BLAS. + * Blockifies features to improve the efficiency of cartesian product */ private def blockify( rank: Int, From 8eab55bccd51706d45e0ccb2281114df4310899c Mon Sep 17 00:00:00 2001 From: Peng Date: Fri, 28 Apr 2017 13:13:04 +0800 Subject: [PATCH 6/9] fix typo --- .../spark/mllib/recommendation/MatrixFactorizationModel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index c16278a86fac..4a9d3e6124ec 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -280,7 +280,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { /** * Use dot to replace blas 3 gemm is the key approach to improve efficiency. * By this change, we can get the topK elements of each block to reduce the GC time. - * Comparing with BLAS.dot, hand writing dot is high efficiency. + * Comparing with BLAS.dot, hand-written dot is high efficiency. */ val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => val m = srcIter.size From a5261b737ff046ff719c219b94af0f1dff5bf2e2 Mon Sep 17 00:00:00 2001 From: Peng Date: Fri, 28 Apr 2017 14:22:33 +0800 Subject: [PATCH 7/9] fix indent 4 space --- .../MatrixFactorizationModel.scala | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 4a9d3e6124ec..1c996d97c93b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -288,27 +288,27 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { val output = new Array[(Int, (Int, Double))](m * n) var j = 0 srcIter.foreach { case (srcId, srcFactor) => - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - dstIter.foreach { case (dstId, dstFactor) => - /** - * The below code is equivalent to - * val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1) - */ - var score: Double = 0 - var k = 0 - while (k < rank) { - score += srcFactor(k) * dstFactor(k) - k += 1 - } - pq += ((dstId, score)) + val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) + dstIter.foreach { case (dstId, dstFactor) => + /** + * The below code is equivalent to + * val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1) + */ + var score: Double = 0 + var k = 0 + while (k < rank) { + score += srcFactor(k) * dstFactor(k) + k += 1 } - val pqIter = pq.iterator - var i = 0 - while (i < n) { - output(j + i) = (srcId, pqIter.next()) - i += 1 - } - j += n + pq += ((dstId, score)) + } + val pqIter = pq.iterator + var i = 0 + while (i < n) { + output(j + i) = (srcId, pqIter.next()) + i += 1 + } + j += n } output.toSeq } From 44a4f7483daf82f42e1b12c659384c955281068b Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Wed, 3 May 2017 23:12:49 +0800 Subject: [PATCH 8/9] move PriorityQueue out of foreach, improve comments --- .../MatrixFactorizationModel.scala | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 1c996d97c93b..751d0c298399 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -275,24 +275,31 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { - val srcBlocks = blockify(rank, srcFeatures) - val dstBlocks = blockify(rank, dstFeatures) + val srcBlocks = blockify(srcFeatures) + val dstBlocks = blockify(dstFeatures) /** - * Use dot to replace blas 3 gemm is the key approach to improve efficiency. - * By this change, we can get the topK elements of each block to reduce the GC time. - * Comparing with BLAS.dot, hand-written dot is high efficiency. + * The previous approach used for computing top-k recommendations aimed to group + * individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could + * be used for efficiency. However, this causes excessive GC pressure due to the large + * arrays required for intermediate result storage, as well as a high sensitivity to the + * block size used. + * The following approach still groups factors into blocks, but instead computes the + * top-k elements per block, using Level 1 BLAS (dot) and an efficient + * BoundedPriorityQueue. This avoids any large intermediate data structures and results + * in significantly reduced GC pressure as well as shuffle data, which far outweighs + * any cost incurred from not using Level 3 BLAS operations. */ val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => val m = srcIter.size val n = math.min(dstIter.size, num) val output = new Array[(Int, (Int, Double))](m * n) var j = 0 + val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) srcIter.foreach { case (srcId, srcFactor) => - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) dstIter.foreach { case (dstId, dstFactor) => /** - * The below code is equivalent to - * val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1) + * Compared with BLAS.dot, the hand-written version below is more efficient than a call + * to the native BLAS backend and the same performance as the fallback F2jBLAS backend. */ var score: Double = 0 var k = 0 @@ -309,6 +316,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { i += 1 } j += n + pq.clear() } output.toSeq } @@ -319,7 +327,6 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { * Blockifies features to improve the efficiency of cartesian product */ private def blockify( - rank: Int, features: RDD[(Int, Array[Double])]): RDD[Seq[(Int, Array[Double])]] = { val blockSize = 4096 // TODO: tune the block size features.mapPartitions { iter => From 17df4cf8aabd5f4e28551e31d43b0231c1e68676 Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Thu, 4 May 2017 21:49:21 +0800 Subject: [PATCH 9/9] match comments with PR 17854 --- .../MatrixFactorizationModel.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 751d0c298399..d45866c016d9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -284,8 +284,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { * arrays required for intermediate result storage, as well as a high sensitivity to the * block size used. * The following approach still groups factors into blocks, but instead computes the - * top-k elements per block, using Level 1 BLAS (dot) and an efficient - * BoundedPriorityQueue. This avoids any large intermediate data structures and results + * top-k elements per block, using a simple dot product (instead of gemm) and an efficient + * [[BoundedPriorityQueue]]. This avoids any large intermediate data structures and results * in significantly reduced GC pressure as well as shuffle data, which far outweighs * any cost incurred from not using Level 3 BLAS operations. */ @@ -297,9 +297,10 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) srcIter.foreach { case (srcId, srcFactor) => dstIter.foreach { case (dstId, dstFactor) => - /** - * Compared with BLAS.dot, the hand-written version below is more efficient than a call - * to the native BLAS backend and the same performance as the fallback F2jBLAS backend. + /* + * The below code is equivalent to + * `val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)` + * This handwritten version is as or more efficient as BLAS calls in this case. */ var score: Double = 0 var k = 0 @@ -307,7 +308,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { score += srcFactor(k) * dstFactor(k) k += 1 } - pq += ((dstId, score)) + pq += dstId -> score } val pqIter = pq.iterator var i = 0 @@ -325,10 +326,11 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { /** * Blockifies features to improve the efficiency of cartesian product + * TODO: SPARK-20443 - expose blockSize as a param? */ private def blockify( - features: RDD[(Int, Array[Double])]): RDD[Seq[(Int, Array[Double])]] = { - val blockSize = 4096 // TODO: tune the block size + features: RDD[(Int, Array[Double])], + blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = { features.mapPartitions { iter => iter.grouped(blockSize) }