From 8a6073f976021efe1e5a1d17c3afddf5e98494a1 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 9 May 2017 10:05:03 +0200 Subject: [PATCH 1/4] Use F2jBLAS and clean up code --- .../apache/spark/ml/recommendation/ALS.scala | 28 ++++++------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index d626f0459967..4a252cedfd0e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -27,6 +27,7 @@ import scala.util.{Sorting, Try} import scala.util.hashing.byteswap64 import com.github.fommil.netlib.BLAS.{getInstance => blas} +import com.github.fommil.netlib.F2jBLAS import org.apache.hadoop.fs.Path import org.json4s.DefaultFormats import org.json4s.JsonDSL._ @@ -363,7 +364,7 @@ class ALSModel private[ml] ( * relatively efficient, the approach implemented here is significantly more efficient. * * This approach groups factors into blocks and computes the top-k elements per block, - * using a simple dot product (instead of gemm) and an efficient [[BoundedPriorityQueue]]. + * using dot product and an efficient [[BoundedPriorityQueue]] (instead of gemm). * It then computes the global top-k by aggregating the per block top-k elements with * a [[TopByKeyAggregator]]. This significantly reduces the size of intermediate and shuffle data. * This is the DataFrame equivalent to the approach used in @@ -393,31 +394,18 @@ class ALSModel private[ml] ( val m = srcIter.size val n = math.min(dstIter.size, num) val output = new Array[(Int, Int, Float)](m * n) - var j = 0 + var i = 0 val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2)) srcIter.foreach { case (srcId, srcFactor) => dstIter.foreach { case (dstId, dstFactor) => - /* - * The below code is equivalent to - * `val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)` - * This handwritten version is as or more efficient as BLAS calls in this case. - */ - var score = 0.0f - var k = 0 - while (k < rank) { - score += srcFactor(k) * dstFactor(k) - k += 1 - } + // We use F2jBLAS which is faster than a call to native BLAS for vector dot product + val score = ALSModel._f2jBLAS.sdot(rank, srcFactor, 1, dstFactor, 1) pq += dstId -> score } - val pqIter = pq.iterator - var i = 0 - while (i < n) { - val (dstId, score) = pqIter.next() - output(j + i) = (srcId, dstId, score) + pq.foreach { case (dstId, score) => + output(i) = (srcId, dstId, score) i += 1 } - j += n pq.clear() } output.toSeq @@ -451,6 +439,8 @@ class ALSModel private[ml] ( @Since("1.6.0") object ALSModel extends MLReadable[ALSModel] { + @transient private[recommendation] val _f2jBLAS = new F2jBLAS + private val NaN = "nan" private val Drop = "drop" private[recommendation] final val supportedColdStartStrategies = Array(NaN, Drop) From 301e8b89691effc065a128b4eb0569e421810189 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 9 May 2017 10:23:48 +0200 Subject: [PATCH 2/4] mllib version --- .../MatrixFactorizationModel.scala | 53 ++++++++----------- 1 file changed, 21 insertions(+), 32 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index d45866c016d9..7ef3ff312cf6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -20,10 +20,9 @@ package org.apache.spark.mllib.recommendation import java.io.IOException import java.lang.{Integer => JavaInteger} -import scala.collection.mutable - import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import com.github.fommil.netlib.BLAS.{getInstance => blas} +import com.github.fommil.netlib.F2jBLAS import org.apache.hadoop.fs.Path import org.json4s._ import org.json4s.JsonDSL._ @@ -33,7 +32,6 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} import org.apache.spark.internal.Logging -import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD @@ -248,6 +246,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { import org.apache.spark.mllib.util.Loader._ + @transient private[recommendation] val _f2jBLAS = new F2jBLAS + /** * Makes recommendations for a single user (or product). */ @@ -263,6 +263,19 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { /** * Makes recommendations for all users (or products). + * + * Note: the previous approach used for computing top-k recommendations aimed to group + * individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could + * be used for efficiency. However, this causes excessive GC pressure due to the large + * arrays required for intermediate result storage, as well as a high sensitivity to the + * block size used. + * + * The following approach still groups factors into blocks, but instead computes the + * top-k elements per block, using dot product and an efficient [[BoundedPriorityQueue]] + * (instead of gemm). This avoids any large intermediate data structures and results + * in significantly reduced GC pressure as well as shuffle data, which far outweighs + * any cost incurred from not using Level 3 BLAS operations. + * * @param rank rank * @param srcFeatures src features to receive recommendations * @param dstFeatures dst features used to make recommendations @@ -277,46 +290,22 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { num: Int): RDD[(Int, Array[(Int, Double)])] = { val srcBlocks = blockify(srcFeatures) val dstBlocks = blockify(dstFeatures) - /** - * The previous approach used for computing top-k recommendations aimed to group - * individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could - * be used for efficiency. However, this causes excessive GC pressure due to the large - * arrays required for intermediate result storage, as well as a high sensitivity to the - * block size used. - * The following approach still groups factors into blocks, but instead computes the - * top-k elements per block, using a simple dot product (instead of gemm) and an efficient - * [[BoundedPriorityQueue]]. This avoids any large intermediate data structures and results - * in significantly reduced GC pressure as well as shuffle data, which far outweighs - * any cost incurred from not using Level 3 BLAS operations. - */ val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => val m = srcIter.size val n = math.min(dstIter.size, num) val output = new Array[(Int, (Int, Double))](m * n) - var j = 0 + var i = 0 val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) srcIter.foreach { case (srcId, srcFactor) => dstIter.foreach { case (dstId, dstFactor) => - /* - * The below code is equivalent to - * `val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)` - * This handwritten version is as or more efficient as BLAS calls in this case. - */ - var score: Double = 0 - var k = 0 - while (k < rank) { - score += srcFactor(k) * dstFactor(k) - k += 1 - } + // We use F2jBLAS which is faster than a call to native BLAS for vector dot product + val score = _f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) pq += dstId -> score } - val pqIter = pq.iterator - var i = 0 - while (i < n) { - output(j + i) = (srcId, pqIter.next()) + pq.foreach { case (dstId, score) => + output(i) = (srcId, (dstId, score)) i += 1 } - j += n pq.clear() } output.toSeq From 0b1eaa34c370bfae7d83190a43d84fae1dc69eb8 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 9 May 2017 10:39:25 +0200 Subject: [PATCH 3/4] No need for 'recommendation' private scope --- .../spark/mllib/recommendation/MatrixFactorizationModel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 7ef3ff312cf6..d339fad1f586 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -246,7 +246,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { import org.apache.spark.mllib.util.Loader._ - @transient private[recommendation] val _f2jBLAS = new F2jBLAS + @transient private val _f2jBLAS = new F2jBLAS /** * Makes recommendations for a single user (or product). From 9dfad1bffe30163eab5a42eeda3ec1ec38783168 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 11 May 2017 09:41:11 +0200 Subject: [PATCH 4/4] Expose {ml, mllib}-private f2jBLAS and use that --- .../src/main/scala/org/apache/spark/ml/linalg/BLAS.scala | 2 +- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 6 ++---- .../src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala | 2 +- .../mllib/recommendation/MatrixFactorizationModel.scala | 6 ++---- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index ef3890962494..2a0f8c11d0a5 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -29,7 +29,7 @@ private[spark] object BLAS extends Serializable { @transient private var _nativeBLAS: NetlibBLAS = _ // For level-1 routines, we use Java implementation. - private def f2jBLAS: NetlibBLAS = { + private[ml] def f2jBLAS: NetlibBLAS = { if (_f2jBLAS == null) { _f2jBLAS = new F2jBLAS } diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 4a252cedfd0e..0955d3e6e1f8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -27,7 +27,6 @@ import scala.util.{Sorting, Try} import scala.util.hashing.byteswap64 import com.github.fommil.netlib.BLAS.{getInstance => blas} -import com.github.fommil.netlib.F2jBLAS import org.apache.hadoop.fs.Path import org.json4s.DefaultFormats import org.json4s.JsonDSL._ @@ -36,6 +35,7 @@ import org.apache.spark.{Dependency, Partitioner, ShuffleDependency, SparkContex import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.linalg.BLAS import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -399,7 +399,7 @@ class ALSModel private[ml] ( srcIter.foreach { case (srcId, srcFactor) => dstIter.foreach { case (dstId, dstFactor) => // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = ALSModel._f2jBLAS.sdot(rank, srcFactor, 1, dstFactor, 1) + val score = BLAS.f2jBLAS.sdot(rank, srcFactor, 1, dstFactor, 1) pq += dstId -> score } pq.foreach { case (dstId, score) => @@ -439,8 +439,6 @@ class ALSModel private[ml] ( @Since("1.6.0") object ALSModel extends MLReadable[ALSModel] { - @transient private[recommendation] val _f2jBLAS = new F2jBLAS - private val NaN = "nan" private val Drop = "drop" private[recommendation] final val supportedColdStartStrategies = Array(NaN, Drop) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index 0cd68a633c0b..cb9774224568 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -31,7 +31,7 @@ private[spark] object BLAS extends Serializable with Logging { @transient private var _nativeBLAS: NetlibBLAS = _ // For level-1 routines, we use Java implementation. - private def f2jBLAS: NetlibBLAS = { + private[mllib] def f2jBLAS: NetlibBLAS = { if (_f2jBLAS == null) { _f2jBLAS = new F2jBLAS } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index d339fad1f586..ac709ad72f0c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -22,7 +22,6 @@ import java.lang.{Integer => JavaInteger} import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import com.github.fommil.netlib.BLAS.{getInstance => blas} -import com.github.fommil.netlib.F2jBLAS import org.apache.hadoop.fs.Path import org.json4s._ import org.json4s.JsonDSL._ @@ -32,6 +31,7 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} import org.apache.spark.internal.Logging +import org.apache.spark.mllib.linalg.BLAS import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD @@ -246,8 +246,6 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { import org.apache.spark.mllib.util.Loader._ - @transient private val _f2jBLAS = new F2jBLAS - /** * Makes recommendations for a single user (or product). */ @@ -299,7 +297,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcIter.foreach { case (srcId, srcFactor) => dstIter.foreach { case (dstId, dstFactor) => // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = _f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) + val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) pq += dstId -> score } pq.foreach { case (dstId, score) =>