From b923b56e874578b32c25989ad01e1345d9be1413 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 23 Nov 2020 10:54:54 +0800 Subject: [PATCH 1/6] use gemm use gemm --- .../apache/spark/ml/recommendation/ALS.scala | 46 ++++++++++++------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 088f6a682be8..cf9a35253eca 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -460,27 +460,36 @@ class ALSModel private[ml] ( val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize) val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize) val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) - .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])] - .flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, Int, Float)](m * n) - var i = 0 + .as[(Array[Int], Array[Float], Array[Int], Array[Float])] + .mapPartitions { iter => + var buffer: Array[Float] = null val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => - dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.sdot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score + iter.flatMap { case (srcIds, srcMat, dstIds, dstMat) => + require(srcMat.length == srcIds.length * rank) + require(dstMat.length == dstIds.length * rank) + val m = srcIds.length + val n = dstIds.length + if (buffer == null || buffer.length < m * n) { + buffer = Array.ofDim[Float](m * n) } - pq.foreach { case (dstId, score) => - output(i) = (srcId, dstId, score) - i += 1 + + BLAS.f2jBLAS.sgemm("T", "N", m, n, rank, 1.0F, + srcMat, rank, dstMat, rank, 0.0F, buffer, m) + + Iterator.range(0, m).flatMap { i => + val srcId = srcIds(i) + pq.clear() + var j = 0 + while (j < n) { pq += dstIds(j) -> buffer(i + j * m); j += 1 } + pq.iterator.map { case (dstId, value) => (srcId, dstId, value) } } + } ++ { + buffer = null pq.clear() + Iterator.empty } - output.toSeq } + // We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output. val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2)) val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn) @@ -499,9 +508,12 @@ class ALSModel private[ml] ( */ private def blockify( factors: Dataset[(Int, Array[Float])], - blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = { + blockSize: Int): Dataset[(Array[Int], Array[Float])] = { import factors.sparkSession.implicits._ - factors.mapPartitions(_.grouped(blockSize)) + factors.mapPartitions { iter => + iter.grouped(blockSize) + .map(block => (block.map(_._1).toArray, block.flatMap(_._2).toArray)) + } } } From b6459683e734af3749be3b9ce6047ca22fabfdd9 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 23 Nov 2020 17:19:11 +0800 Subject: [PATCH 2/6] use gemv --- .../org/apache/spark/ml/recommendation/ALS.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index cf9a35253eca..748f91f34eca 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -469,18 +469,18 @@ class ALSModel private[ml] ( require(dstMat.length == dstIds.length * rank) val m = srcIds.length val n = dstIds.length - if (buffer == null || buffer.length < m * n) { - buffer = Array.ofDim[Float](m * n) + if (buffer == null || buffer.length < n) { + buffer = Array.ofDim[Float](n) } - BLAS.f2jBLAS.sgemm("T", "N", m, n, rank, 1.0F, - srcMat, rank, dstMat, rank, 0.0F, buffer, m) - Iterator.range(0, m).flatMap { i => - val srcId = srcIds(i) + BLAS.f2jBLAS.sgemv("T", rank, n, 1.0F, dstMat, 0, rank, + srcMat, i * rank, 1, 0.0F, buffer, 0, 1) + pq.clear() var j = 0 - while (j < n) { pq += dstIds(j) -> buffer(i + j * m); j += 1 } + while (j < n) { pq += dstIds(j) -> buffer(j); j += 1 } + val srcId = srcIds(i) pq.iterator.map { case (dstId, value) => (srcId, dstId, value) } } } ++ { From 7861b7bec7c21ceefb31a5e01d5df705eaac7a99 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 23 Nov 2020 17:25:08 +0800 Subject: [PATCH 3/6] use array agg use array agg --- .../apache/spark/ml/recommendation/ALS.scala | 15 +++--- .../recommendation/TopByKeyAggregator.scala | 54 +++++++++++++++++++ 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 748f91f34eca..043e726e150d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -459,7 +459,7 @@ class ALSModel private[ml] ( val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize) val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize) - val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) + val partialRecs = srcFactorsBlocked.crossJoin(dstFactorsBlocked) .as[(Array[Int], Array[Float], Array[Int], Array[Float])] .mapPartitions { iter => var buffer: Array[Float] = null @@ -473,15 +473,16 @@ class ALSModel private[ml] ( buffer = Array.ofDim[Float](n) } - Iterator.range(0, m).flatMap { i => + Iterator.tabulate(m) { i => + // buffer = i-th vec in srcMat * dstMat BLAS.f2jBLAS.sgemv("T", rank, n, 1.0F, dstMat, 0, rank, srcMat, i * rank, 1, 0.0F, buffer, 0, 1) pq.clear() var j = 0 while (j < n) { pq += dstIds(j) -> buffer(j); j += 1 } - val srcId = srcIds(i) - pq.iterator.map { case (dstId, value) => (srcId, dstId, value) } + val (kDstIds, kScores) = pq.toArray.sortBy(-_._2).unzip + (srcIds(i), kDstIds, kScores) } } ++ { buffer = null @@ -490,9 +491,9 @@ class ALSModel private[ml] ( } } - // We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output. - val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2)) - val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn) + val aggregator = new TopKArrayAggregator(num) + val recs = partialRecs.as[(Int, Array[Int], Array[Float])] + .groupByKey(_._1).agg(aggregator.toColumn) .toDF("id", "recommendations") val arrayType = ArrayType( diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala index ed41169070c5..639e37f40a0a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala @@ -57,3 +57,57 @@ private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, V: Ty override def outputEncoder: Encoder[Array[(K2, V)]] = ExpressionEncoder[Array[(K2, V)]]() } + + +/** + * Works on rows of the form (ScrId, DstIds, Scores). + * Finds the top `num` DstIds and Scores. + */ +private[recommendation] class TopKArrayAggregator(num: Int) + extends Aggregator[ + (Int, Array[Int], Array[Float]), + (Array[Int], Array[Float]), + Array[(Int, Float)]] { + + override def zero: (Array[Int], Array[Float]) = { + (Array.emptyIntArray, Array.emptyFloatArray) + } + + override def reduce( + b: (Array[Int], Array[Float]), + a: (Int, Array[Int], Array[Float])): (Array[Int], Array[Float]) = { + merge(b, (a._2, a._3)) + } + + def merge( + b1: (Array[Int], Array[Float]), + b2: (Array[Int], Array[Float])): (Array[Int], Array[Float]) = { + val (ids1, scores1) = b1 + val (ids2, socres2) = b2 + if (ids1.isEmpty) { + b2 + } else if (ids2.isEmpty) { + b1 + } else { + val len1 = ids1.length + val len2 = ids2.length + val indices = Array.range(0, len1 + len2) + .sortBy(i => if (i < len1) -scores1(i) else -socres2(i - len1)) + .take(num) + (indices.map(i => if (i < len1) ids1(i) else ids2(i - len1)), + indices.map(i => if (i < len1) scores1(i) else socres2(i - len1))) + } + } + + override def finish(reduction: (Array[Int], Array[Float])): Array[(Int, Float)] = { + reduction._1.zip(reduction._2) + } + + override def bufferEncoder: Encoder[(Array[Int], Array[Float])] = { + Encoders.kryo[(Array[Int], Array[Float])] + } + + override def outputEncoder: Encoder[Array[(Int, Float)]] = { + ExpressionEncoder[Array[(Int, Float)]]() + } +} From 7dd2b919b7149e5064cc1bd90100db52feb6cba2 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 23 Nov 2020 18:53:28 +0800 Subject: [PATCH 4/6] use guava ordering --- .../apache/spark/ml/recommendation/ALS.scala | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 043e726e150d..73cb766d3c9c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -27,6 +27,7 @@ import scala.util.{Sorting, Try} import scala.util.hashing.byteswap64 import com.github.fommil.netlib.BLAS.{getInstance => blas} +import com.google.common.collect.{Ordering => GuavaOrdering} import org.apache.hadoop.fs.Path import org.json4s.DefaultFormats import org.json4s.JsonDSL._ @@ -47,7 +48,7 @@ import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{BoundedPriorityQueue, Utils} +import org.apache.spark.util.Utils import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, SortDataFormat, Sorter} import org.apache.spark.util.random.XORShiftRandom @@ -456,6 +457,7 @@ class ALSModel private[ml] ( num: Int, blockSize: Int): DataFrame = { import srcFactors.sparkSession.implicits._ + import ALSModel.TopSelector val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize) val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize) @@ -463,7 +465,7 @@ class ALSModel private[ml] ( .as[(Array[Int], Array[Float], Array[Int], Array[Float])] .mapPartitions { iter => var buffer: Array[Float] = null - val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2)) + var selector: TopSelector = null iter.flatMap { case (srcIds, srcMat, dstIds, dstMat) => require(srcMat.length == srcIds.length * rank) require(dstMat.length == dstIds.length * rank) @@ -471,22 +473,19 @@ class ALSModel private[ml] ( val n = dstIds.length if (buffer == null || buffer.length < n) { buffer = Array.ofDim[Float](n) + selector = new TopSelector(buffer) } Iterator.tabulate(m) { i => // buffer = i-th vec in srcMat * dstMat BLAS.f2jBLAS.sgemv("T", rank, n, 1.0F, dstMat, 0, rank, srcMat, i * rank, 1, 0.0F, buffer, 0, 1) - - pq.clear() - var j = 0 - while (j < n) { pq += dstIds(j) -> buffer(j); j += 1 } - val (kDstIds, kScores) = pq.toArray.sortBy(-_._2).unzip - (srcIds(i), kDstIds, kScores) + val indices = selector.selectTopKIndices(Iterator.range(0, n), num) + (srcIds(i), indices.map(dstIds), indices.map(buffer)) } } ++ { buffer = null - pq.clear() + selector = null Iterator.empty } } @@ -564,6 +563,21 @@ object ALSModel extends MLReadable[ALSModel] { model } } + + /** select top indices based on values. */ + private[recommendation] class TopSelector(val values: Array[Float]) { + import scala.collection.JavaConverters._ + + private val indexOrdering = new GuavaOrdering[Int] { + override def compare(left: Int, right: Int): Int = { + Ordering[Float].compare(values(left), values(right)) + } + } + + def selectTopKIndices(iterator: Iterator[Int], k: Int): Array[Int] = { + indexOrdering.greatestOf(iterator.asJava, k).asScala.toArray + } + } } /** From 543a41fe87ca1935e4eb6c86012e80b2b3b90b70 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 24 Nov 2020 10:53:18 +0800 Subject: [PATCH 5/6] back to TopByKeyAggregator back to TopByKeyAggregator --- .../apache/spark/ml/recommendation/ALS.scala | 21 ++++---- .../recommendation/TopByKeyAggregator.scala | 54 ------------------- 2 files changed, 9 insertions(+), 66 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 73cb766d3c9c..857f41133766 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -461,7 +461,7 @@ class ALSModel private[ml] ( val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize) val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize) - val partialRecs = srcFactorsBlocked.crossJoin(dstFactorsBlocked) + val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) .as[(Array[Int], Array[Float], Array[Int], Array[Float])] .mapPartitions { iter => var buffer: Array[Float] = null @@ -476,23 +476,20 @@ class ALSModel private[ml] ( selector = new TopSelector(buffer) } - Iterator.tabulate(m) { i => + Iterator.range(0, m).flatMap { i => // buffer = i-th vec in srcMat * dstMat BLAS.f2jBLAS.sgemv("T", rank, n, 1.0F, dstMat, 0, rank, srcMat, i * rank, 1, 0.0F, buffer, 0, 1) - val indices = selector.selectTopKIndices(Iterator.range(0, n), num) - (srcIds(i), indices.map(dstIds), indices.map(buffer)) + + val srcId = srcIds(i) + selector.selectTopKIndices(Iterator.range(0, n), num) + .iterator.map { j => (srcId, dstIds(j), buffer(j)) } } - } ++ { - buffer = null - selector = null - Iterator.empty } } - - val aggregator = new TopKArrayAggregator(num) - val recs = partialRecs.as[(Int, Array[Int], Array[Float])] - .groupByKey(_._1).agg(aggregator.toColumn) + // We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output. + val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2)) + val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn) .toDF("id", "recommendations") val arrayType = ArrayType( diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala index 639e37f40a0a..ed41169070c5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/TopByKeyAggregator.scala @@ -57,57 +57,3 @@ private[recommendation] class TopByKeyAggregator[K1: TypeTag, K2: TypeTag, V: Ty override def outputEncoder: Encoder[Array[(K2, V)]] = ExpressionEncoder[Array[(K2, V)]]() } - - -/** - * Works on rows of the form (ScrId, DstIds, Scores). - * Finds the top `num` DstIds and Scores. - */ -private[recommendation] class TopKArrayAggregator(num: Int) - extends Aggregator[ - (Int, Array[Int], Array[Float]), - (Array[Int], Array[Float]), - Array[(Int, Float)]] { - - override def zero: (Array[Int], Array[Float]) = { - (Array.emptyIntArray, Array.emptyFloatArray) - } - - override def reduce( - b: (Array[Int], Array[Float]), - a: (Int, Array[Int], Array[Float])): (Array[Int], Array[Float]) = { - merge(b, (a._2, a._3)) - } - - def merge( - b1: (Array[Int], Array[Float]), - b2: (Array[Int], Array[Float])): (Array[Int], Array[Float]) = { - val (ids1, scores1) = b1 - val (ids2, socres2) = b2 - if (ids1.isEmpty) { - b2 - } else if (ids2.isEmpty) { - b1 - } else { - val len1 = ids1.length - val len2 = ids2.length - val indices = Array.range(0, len1 + len2) - .sortBy(i => if (i < len1) -scores1(i) else -socres2(i - len1)) - .take(num) - (indices.map(i => if (i < len1) ids1(i) else ids2(i - len1)), - indices.map(i => if (i < len1) scores1(i) else socres2(i - len1))) - } - } - - override def finish(reduction: (Array[Int], Array[Float])): Array[(Int, Float)] = { - reduction._1.zip(reduction._2) - } - - override def bufferEncoder: Encoder[(Array[Int], Array[Float])] = { - Encoders.kryo[(Array[Int], Array[Float])] - } - - override def outputEncoder: Encoder[Array[(Int, Float)]] = { - ExpressionEncoder[Array[(Int, Float)]]() - } -} From 8ca7d562c20812062e11e8f6961034157cc08ea8 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 24 Nov 2020 15:45:39 +0800 Subject: [PATCH 6/6] remove selector & directly use guavaording --- .../apache/spark/ml/recommendation/ALS.scala | 37 +++++++------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 857f41133766..1b856bda45e2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -457,33 +457,37 @@ class ALSModel private[ml] ( num: Int, blockSize: Int): DataFrame = { import srcFactors.sparkSession.implicits._ - import ALSModel.TopSelector + import scala.collection.JavaConverters._ val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize) val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize) val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) .as[(Array[Int], Array[Float], Array[Int], Array[Float])] .mapPartitions { iter => - var buffer: Array[Float] = null - var selector: TopSelector = null + var scores: Array[Float] = null + var idxOrd: GuavaOrdering[Int] = null iter.flatMap { case (srcIds, srcMat, dstIds, dstMat) => require(srcMat.length == srcIds.length * rank) require(dstMat.length == dstIds.length * rank) val m = srcIds.length val n = dstIds.length - if (buffer == null || buffer.length < n) { - buffer = Array.ofDim[Float](n) - selector = new TopSelector(buffer) + if (scores == null || scores.length < n) { + scores = Array.ofDim[Float](n) + idxOrd = new GuavaOrdering[Int] { + override def compare(left: Int, right: Int): Int = { + Ordering[Float].compare(scores(left), scores(right)) + } + } } Iterator.range(0, m).flatMap { i => // buffer = i-th vec in srcMat * dstMat BLAS.f2jBLAS.sgemv("T", rank, n, 1.0F, dstMat, 0, rank, - srcMat, i * rank, 1, 0.0F, buffer, 0, 1) + srcMat, i * rank, 1, 0.0F, scores, 0, 1) val srcId = srcIds(i) - selector.selectTopKIndices(Iterator.range(0, n), num) - .iterator.map { j => (srcId, dstIds(j), buffer(j)) } + idxOrd.greatestOf(Iterator.range(0, n).asJava, num).asScala + .iterator.map { j => (srcId, dstIds(j), scores(j)) } } } } @@ -560,21 +564,6 @@ object ALSModel extends MLReadable[ALSModel] { model } } - - /** select top indices based on values. */ - private[recommendation] class TopSelector(val values: Array[Float]) { - import scala.collection.JavaConverters._ - - private val indexOrdering = new GuavaOrdering[Int] { - override def compare(left: Int, right: Int): Int = { - Ordering[Float].compare(values(left), values(right)) - } - } - - def selectTopKIndices(iterator: Iterator[Int], k: Int): Array[Int] = { - indexOrdering.greatestOf(iterator.asJava, k).asScala.toArray - } - } } /**