From 8bbaea0a3d3f982d4d231b8d1712bdf4c2408732 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 22 Jan 2015 23:27:04 -0800 Subject: [PATCH 1/8] make ALS take generic IDs --- .../apache/spark/ml/recommendation/ALS.scala | 154 ++++++++++-------- 1 file changed, 88 insertions(+), 66 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 2d89e76a4c8b..e448cb1e3255 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -102,12 +102,12 @@ private[recommendation] trait ALSParams extends Params with HasMaxIter with HasR /** * Model fitted by ALS. */ -class ALSModel private[ml] ( - override val parent: ALS, +class ALSModel[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemType] private[ml] ( + override val parent: ALS[UserType, ItemType], override val fittingParamMap: ParamMap, k: Int, - userFactors: RDD[(Int, Array[Float])], - itemFactors: RDD[(Int, Array[Float])]) + userFactors: RDD[(UserType, Array[Float])], + itemFactors: RDD[(ItemType, Array[Float])]) extends Model[ALSModel] with ALSParams { def setPredictionCol(value: String): this.type = set(predictionCol, value) @@ -151,7 +151,7 @@ class ALSModel private[ml] ( private object ALSModel { /** Case class to convert factors to SchemaRDDs */ - private case class Factor(id: Int, features: Seq[Float]) + private case class Factor[@specialized(Int, Long) IDType](id: IDType, features: Seq[Float]) } /** @@ -183,7 +183,8 @@ private object ALSModel { * indicated user * preferences rather than explicit ratings given to items. */ -class ALS extends Estimator[ALSModel] with ALSParams { +class ALS[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemType] + extends Estimator[ALSModel] with ALSParams { import org.apache.spark.ml.recommendation.ALS.Rating @@ -209,7 +210,7 @@ class ALS extends Estimator[ALSModel] with ALSParams { setMaxIter(20) setRegParam(1.0) - override def fit(dataset: SchemaRDD, paramMap: ParamMap): ALSModel = { + override def fit(dataset: SchemaRDD, paramMap: ParamMap): ALSModel[_, _] = { import dataset.sqlContext._ val map = this.paramMap ++ paramMap val ratings = @@ -234,7 +235,10 @@ class ALS extends Estimator[ALSModel] with ALSParams { private[recommendation] object ALS extends Logging { /** Rating class for better code readability. */ - private[recommendation] case class Rating(user: Int, item: Int, rating: Float) + private[recommendation] + case class Rating[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemType]( + user: UserType, + item: ItemType, rating: Float) /** Cholesky solver for least square problems. */ private[recommendation] class CholeskySolver { @@ -346,15 +350,15 @@ private[recommendation] object ALS extends Logging { /** * Implementation of the ALS algorithm. */ - private def train( - ratings: RDD[Rating], + private def train[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemType]( + ratings: RDD[Rating[UserType, ItemType]], rank: Int = 10, numUserBlocks: Int = 10, numItemBlocks: Int = 10, maxIter: Int = 10, regParam: Double = 1.0, implicitPrefs: Boolean = false, - alpha: Double = 1.0): (RDD[(Int, Array[Float])], RDD[(Int, Array[Float])]) = { + alpha: Double = 1.0): (RDD[(UserType, Array[Float])], RDD[(ItemType, Array[Float])]) = { val userPart = new HashPartitioner(numUserBlocks) val itemPart = new HashPartitioner(numItemBlocks) val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) @@ -457,8 +461,8 @@ private[recommendation] object ALS extends Logging { * * @see [[LocalIndexEncoder]] */ - private[recommendation] case class InBlock( - srcIds: Array[Int], + private[recommendation] case class InBlock[@specialized(Int, Long) SrcType]( + srcIds: Array[SrcType], dstPtrs: Array[Int], dstEncodedIndices: Array[Int], ratings: Array[Float]) { @@ -476,7 +480,9 @@ private[recommendation] object ALS extends Logging { * @param rank rank * @return initialized factor blocks */ - private def initialize(inBlocks: RDD[(Int, InBlock)], rank: Int): RDD[(Int, FactorBlock)] = { + private def initialize[@specialized(Int, Long) SrcType]( + inBlocks: RDD[(Int, InBlock[SrcType])], + rank: Int): RDD[(Int, FactorBlock)] = { // Choose a unit vector uniformly at random from the unit sphere, but from the // "first quadrant" where all elements are nonnegative. This can be done by choosing // elements distributed as Normal(0,1) and taking the absolute value, and then normalizing. @@ -498,7 +504,10 @@ private[recommendation] object ALS extends Logging { * A rating block that contains src IDs, dst IDs, and ratings, stored in primitive arrays. */ private[recommendation] - case class RatingBlock(srcIds: Array[Int], dstIds: Array[Int], ratings: Array[Float]) { + case class RatingBlock[@specialized(Int, Long) SrcType, @specialized(Int, Long) DstType]( + srcIds: Array[SrcType], + dstIds: Array[DstType], + ratings: Array[Float]) { /** Size of the block. */ val size: Int = srcIds.size require(dstIds.size == size) @@ -508,15 +517,17 @@ private[recommendation] object ALS extends Logging { /** * Builder for [[RatingBlock]]. [[mutable.ArrayBuilder]] is used to avoid boxing/unboxing. */ - private[recommendation] class RatingBlockBuilder extends Serializable { + private[recommendation] class RatingBlockBuilder[ + @specialized(Int, Long) SrcType, + @specialized(Int, Long) DstType] extends Serializable { - private val srcIds = mutable.ArrayBuilder.make[Int] - private val dstIds = mutable.ArrayBuilder.make[Int] + private val srcIds = mutable.ArrayBuilder.make[SrcType] + private val dstIds = mutable.ArrayBuilder.make[DstType] private val ratings = mutable.ArrayBuilder.make[Float] var size = 0 /** Adds a rating. */ - def add(r: Rating): this.type = { + def add(r: Rating[SrcType, DstType]): this.type = { size += 1 srcIds += r.user dstIds += r.item @@ -525,7 +536,7 @@ private[recommendation] object ALS extends Logging { } /** Merges another [[RatingBlockBuilder]]. */ - def merge(other: RatingBlock): this.type = { + def merge(other: RatingBlock[SrcType, DstType]): this.type = { size += other.srcIds.size srcIds ++= other.srcIds dstIds ++= other.dstIds @@ -534,8 +545,8 @@ private[recommendation] object ALS extends Logging { } /** Builds a [[RatingBlock]]. */ - def build(): RatingBlock = { - RatingBlock(srcIds.result(), dstIds.result(), ratings.result()) + def build(): RatingBlock[SrcType, DstType] = { + RatingBlock[SrcType, DstType](srcIds.result(), dstIds.result(), ratings.result()) } } @@ -548,10 +559,10 @@ private[recommendation] object ALS extends Logging { * * @return an RDD of rating blocks in the form of ((srcBlockId, dstBlockId), ratingBlock) */ - private def partitionRatings( - ratings: RDD[Rating], + private def partitionRatings[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemType]( + ratings: RDD[Rating[UserType, ItemType]], srcPart: Partitioner, - dstPart: Partitioner): RDD[((Int, Int), RatingBlock)] = { + dstPart: Partitioner): RDD[((Int, Int), RatingBlock[UserType, ItemType])] = { /* The implementation produces the same result as the following but generates less objects. @@ -565,7 +576,7 @@ private[recommendation] object ALS extends Logging { val numPartitions = srcPart.numPartitions * dstPart.numPartitions ratings.mapPartitions { iter => - val builders = Array.fill(numPartitions)(new RatingBlockBuilder) + val builders = Array.fill(numPartitions)(new RatingBlockBuilder[UserType, ItemType]) iter.flatMap { r => val srcBlockId = srcPart.getPartition(r.user) val dstBlockId = dstPart.getPartition(r.item) @@ -586,7 +597,7 @@ private[recommendation] object ALS extends Logging { } } }.groupByKey().mapValues { blocks => - val builder = new RatingBlockBuilder + val builder = new RatingBlockBuilder[UserType, ItemType] blocks.foreach(builder.merge) builder.build() }.setName("ratingBlocks") @@ -596,9 +607,10 @@ private[recommendation] object ALS extends Logging { * Builder for uncompressed in-blocks of (srcId, dstEncodedIndex, rating) tuples. * @param encoder encoder for dst indices */ - private[recommendation] class UncompressedInBlockBuilder(encoder: LocalIndexEncoder) { + private[recommendation] class UncompressedInBlockBuilder[@specialized(Int, Long) SrcType]( + encoder: LocalIndexEncoder) { - private val srcIds = mutable.ArrayBuilder.make[Int] + private val srcIds = mutable.ArrayBuilder.make[SrcType] private val dstEncodedIndices = mutable.ArrayBuilder.make[Int] private val ratings = mutable.ArrayBuilder.make[Float] @@ -612,7 +624,7 @@ private[recommendation] object ALS extends Logging { */ def add( dstBlockId: Int, - srcIds: Array[Int], + srcIds: Array[SrcType], dstLocalIndices: Array[Int], ratings: Array[Float]): this.type = { val sz = srcIds.size @@ -629,7 +641,7 @@ private[recommendation] object ALS extends Logging { } /** Builds a [[UncompressedInBlock]]. */ - def build(): UncompressedInBlock = { + def build(): UncompressedInBlock[SrcType] = { new UncompressedInBlock(srcIds.result(), dstEncodedIndices.result(), ratings.result()) } } @@ -637,8 +649,8 @@ private[recommendation] object ALS extends Logging { /** * A block of (srcId, dstEncodedIndex, rating) tuples stored in primitive arrays. */ - private[recommendation] class UncompressedInBlock( - val srcIds: Array[Int], + private[recommendation] class UncompressedInBlock[@specialized(Int, Long) SrcType]( + val srcIds: Array[SrcType], val dstEncodedIndices: Array[Int], val ratings: Array[Float]) { @@ -650,11 +662,11 @@ private[recommendation] object ALS extends Logging { * sparse matrix from coordinate list (COO) format into compressed sparse column (CSC) format. * Sorting is done using Spark's built-in Timsort to avoid generating too many objects. */ - def compress(): InBlock = { + def compress(): InBlock[SrcType] = { val sz = size assert(sz > 0, "Empty in-link block should not exist.") sort() - val uniqueSrcIdsBuilder = mutable.ArrayBuilder.make[Int] + val uniqueSrcIdsBuilder = mutable.ArrayBuilder.make[SrcType] val dstCountsBuilder = mutable.ArrayBuilder.make[Int] var preSrcId = srcIds(0) uniqueSrcIdsBuilder += preSrcId @@ -694,46 +706,55 @@ private[recommendation] object ALS extends Logging { val sortId = Utils.random.nextInt() logDebug(s"Start sorting an uncompressed in-block of size $sz. (sortId = $sortId)") val start = System.nanoTime() - val sorter = new Sorter(new UncompressedInBlockSort) - sorter.sort(this, 0, size, Ordering[IntWrapper]) + val sorter = new Sorter(new UncompressedInBlockSort[SrcType]) + sorter.sort(this, 0, size, Ordering[KeyWrapper[SrcType]]) val duration = (System.nanoTime() - start) / 1e9 logDebug(s"Sorting took $duration seconds. (sortId = $sortId)") } } /** - * A wrapper that holds a primitive integer key. + * A wrapper that holds a primitive key. * * @see [[UncompressedInBlockSort]] */ - private class IntWrapper(var key: Int = 0) extends Ordered[IntWrapper] { - override def compare(that: IntWrapper): Int = { + private class KeyWrapper[@specialized(Int, Long) KeyType <: Ordered[KeyType]] + extends Ordered[KeyWrapper[KeyType]] { + + private var key: KeyType = _ + + override def compare(that: KeyWrapper[KeyType]): Int = { key.compare(that.key) } + + def setKey(key: KeyType): this.type = { + this.key = key + this + } } /** * [[SortDataFormat]] of [[UncompressedInBlock]] used by [[Sorter]]. */ - private class UncompressedInBlockSort extends SortDataFormat[IntWrapper, UncompressedInBlock] { + private class UncompressedInBlockSort[@specialized(Int, Long) SrcType] + extends SortDataFormat[KeyWrapper[SrcType], UncompressedInBlock[SrcType]] { - override def newKey(): IntWrapper = new IntWrapper() + override def newKey(): KeyWrapper[SrcType] = new KeyWrapper() override def getKey( - data: UncompressedInBlock, + data: UncompressedInBlock[SrcType], pos: Int, - reuse: IntWrapper): IntWrapper = { + reuse: KeyWrapper[SrcType]): KeyWrapper[SrcType] = { if (reuse == null) { - new IntWrapper(data.srcIds(pos)) + new KeyWrapper().setKey(data.srcIds(pos)) } else { - reuse.key = data.srcIds(pos) - reuse + reuse.setKey(data.srcIds(pos)) } } override def getKey( - data: UncompressedInBlock, - pos: Int): IntWrapper = { + data: UncompressedInBlock[SrcType], + pos: Int): KeyWrapper[SrcType] = { getKey(data, pos, null) } @@ -746,16 +767,16 @@ private[recommendation] object ALS extends Logging { data(pos1) = tmp } - override def swap(data: UncompressedInBlock, pos0: Int, pos1: Int): Unit = { + override def swap(data: UncompressedInBlock[SrcType], pos0: Int, pos1: Int): Unit = { swapElements(data.srcIds, pos0, pos1) swapElements(data.dstEncodedIndices, pos0, pos1) swapElements(data.ratings, pos0, pos1) } override def copyRange( - src: UncompressedInBlock, + src: UncompressedInBlock[SrcType], srcPos: Int, - dst: UncompressedInBlock, + dst: UncompressedInBlock[SrcType], dstPos: Int, length: Int): Unit = { System.arraycopy(src.srcIds, srcPos, dst.srcIds, dstPos, length) @@ -763,15 +784,15 @@ private[recommendation] object ALS extends Logging { System.arraycopy(src.ratings, srcPos, dst.ratings, dstPos, length) } - override def allocate(length: Int): UncompressedInBlock = { + override def allocate(length: Int): UncompressedInBlock[SrcType] = { new UncompressedInBlock( - new Array[Int](length), new Array[Int](length), new Array[Float](length)) + new Array[SrcType](length), new Array[Int](length), new Array[Float](length)) } override def copyElement( - src: UncompressedInBlock, + src: UncompressedInBlock[SrcType], srcPos: Int, - dst: UncompressedInBlock, + dst: UncompressedInBlock[SrcType], dstPos: Int): Unit = { dst.srcIds(dstPos) = src.srcIds(srcPos) dst.dstEncodedIndices(dstPos) = src.dstEncodedIndices(srcPos) @@ -787,19 +808,20 @@ private[recommendation] object ALS extends Logging { * @param dstPart partitioner for dst IDs * @return (in-blocks, out-blocks) */ - private def makeBlocks( + private def makeBlocks[@specialized(Int, Long) SrcType, @specialized(Int, Long) DstType]( prefix: String, - ratingBlocks: RDD[((Int, Int), RatingBlock)], + ratingBlocks: RDD[((Int, Int), RatingBlock[SrcType, DstType])], srcPart: Partitioner, - dstPart: Partitioner): (RDD[(Int, InBlock)], RDD[(Int, OutBlock)]) = { + dstPart: Partitioner)( + implicit ord: Ordering[DstType]): (RDD[(Int, InBlock[SrcType])], RDD[(Int, OutBlock)]) = { val inBlocks = ratingBlocks.map { case ((srcBlockId, dstBlockId), RatingBlock(srcIds, dstIds, ratings)) => // The implementation is a faster version of // val dstIdToLocalIndex = dstIds.toSet.toSeq.sorted.zipWithIndex.toMap val start = System.nanoTime() - val dstIdSet = new OpenHashSet[Int](1 << 20) + val dstIdSet = new OpenHashSet[DstType](1 << 20) dstIds.foreach(dstIdSet.add) - val sortedDstIds = new Array[Int](dstIdSet.size) + val sortedDstIds = new Array[DstType](dstIdSet.size) var i = 0 var pos = dstIdSet.nextPos(0) while (pos != -1) { @@ -808,8 +830,8 @@ private[recommendation] object ALS extends Logging { i += 1 } assert(i == dstIdSet.size) - ju.Arrays.sort(sortedDstIds) - val dstIdToLocalIndex = new OpenHashMap[Int, Int](sortedDstIds.size) + ju.Arrays.sort(sortedDstIds, ord) + val dstIdToLocalIndex = new OpenHashMap[DstType, Int](sortedDstIds.size) i = 0 while (i < sortedDstIds.size) { dstIdToLocalIndex.update(sortedDstIds(i), i) @@ -822,7 +844,7 @@ private[recommendation] object ALS extends Logging { }.groupByKey(new HashPartitioner(srcPart.numPartitions)) .mapValues { iter => val builder = - new UncompressedInBlockBuilder(new LocalIndexEncoder(dstPart.numPartitions)) + new UncompressedInBlockBuilder[SrcType](new LocalIndexEncoder(dstPart.numPartitions)) iter.foreach { case (dstBlockId, srcIds, dstLocalIndices, ratings) => builder.add(dstBlockId, srcIds, dstLocalIndices, ratings) } @@ -867,10 +889,10 @@ private[recommendation] object ALS extends Logging { * * @return dst factors */ - private def computeFactors( + private def computeFactors[@specialized(Int, Long) SrcType]( srcFactorBlocks: RDD[(Int, FactorBlock)], srcOutBlocks: RDD[(Int, OutBlock)], - dstInBlocks: RDD[(Int, InBlock)], + dstInBlocks: RDD[(Int, InBlock[SrcType])], rank: Int, regParam: Double, srcEncoder: LocalIndexEncoder, From 72b50065ee6577f8f87b65785887fce44f7005e9 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 29 Jan 2015 09:39:18 -0800 Subject: [PATCH 2/8] remove generic from pipeline interface --- .../org/apache/spark/ml/recommendation/ALS.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index e448cb1e3255..410522f4f1cf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -102,12 +102,12 @@ private[recommendation] trait ALSParams extends Params with HasMaxIter with HasR /** * Model fitted by ALS. */ -class ALSModel[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemType] private[ml] ( - override val parent: ALS[UserType, ItemType], +class ALSModel private[ml] ( + override val parent: ALS, override val fittingParamMap: ParamMap, k: Int, - userFactors: RDD[(UserType, Array[Float])], - itemFactors: RDD[(ItemType, Array[Float])]) + userFactors: RDD[(Int, Array[Float])], + itemFactors: RDD[(Int, Array[Float])]) extends Model[ALSModel] with ALSParams { def setPredictionCol(value: String): this.type = set(predictionCol, value) @@ -151,7 +151,7 @@ class ALSModel[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemTyp private object ALSModel { /** Case class to convert factors to SchemaRDDs */ - private case class Factor[@specialized(Int, Long) IDType](id: IDType, features: Seq[Float]) + private case class Factor(id: Int, features: Seq[Float]) } /** @@ -183,8 +183,7 @@ private object ALSModel { * indicated user * preferences rather than explicit ratings given to items. */ -class ALS[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemType] - extends Estimator[ALSModel] with ALSParams { +class ALS extends Estimator[ALSModel] with ALSParams { import org.apache.spark.ml.recommendation.ALS.Rating @@ -210,7 +209,7 @@ class ALS[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemType] setMaxIter(20) setRegParam(1.0) - override def fit(dataset: SchemaRDD, paramMap: ParamMap): ALSModel[_, _] = { + override def fit(dataset: SchemaRDD, paramMap: ParamMap): ALSModel = { import dataset.sqlContext._ val map = this.paramMap ++ paramMap val ratings = From 7a5aeb3a1ed3c90cee8dc7b5766a9fd497f2559b Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 29 Jan 2015 09:40:31 -0800 Subject: [PATCH 3/8] UserType -> User, ItemType -> Item --- .../apache/spark/ml/recommendation/ALS.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index e274e165df06..9c8716e095d1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -219,9 +219,9 @@ private[recommendation] object ALS extends Logging { /** Rating class for better code readability. */ private[recommendation] - case class Rating[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemType]( - user: UserType, - item: ItemType, rating: Float) + case class Rating[@specialized(Int, Long) User, @specialized(Int, Long) Item]( + user: User, + item: Item, rating: Float) /** Cholesky solver for least square problems. */ private[recommendation] class CholeskySolver { @@ -333,15 +333,15 @@ private[recommendation] object ALS extends Logging { /** * Implementation of the ALS algorithm. */ - private def train[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemType]( - ratings: RDD[Rating[UserType, ItemType]], + private def train[@specialized(Int, Long) User, @specialized(Int, Long) Item]( + ratings: RDD[Rating[User, Item]], rank: Int = 10, numUserBlocks: Int = 10, numItemBlocks: Int = 10, maxIter: Int = 10, regParam: Double = 1.0, implicitPrefs: Boolean = false, - alpha: Double = 1.0): (RDD[(UserType, Array[Float])], RDD[(ItemType, Array[Float])]) = { + alpha: Double = 1.0): (RDD[(User, Array[Float])], RDD[(Item, Array[Float])]) = { val userPart = new HashPartitioner(numUserBlocks) val itemPart = new HashPartitioner(numItemBlocks) val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) @@ -542,10 +542,10 @@ private[recommendation] object ALS extends Logging { * * @return an RDD of rating blocks in the form of ((srcBlockId, dstBlockId), ratingBlock) */ - private def partitionRatings[@specialized(Int, Long) UserType, @specialized(Int, Long) ItemType]( - ratings: RDD[Rating[UserType, ItemType]], + private def partitionRatings[@specialized(Int, Long) User, @specialized(Int, Long) Item]( + ratings: RDD[Rating[User, Item]], srcPart: Partitioner, - dstPart: Partitioner): RDD[((Int, Int), RatingBlock[UserType, ItemType])] = { + dstPart: Partitioner): RDD[((Int, Int), RatingBlock[User, Item])] = { /* The implementation produces the same result as the following but generates less objects. @@ -559,7 +559,7 @@ private[recommendation] object ALS extends Logging { val numPartitions = srcPart.numPartitions * dstPart.numPartitions ratings.mapPartitions { iter => - val builders = Array.fill(numPartitions)(new RatingBlockBuilder[UserType, ItemType]) + val builders = Array.fill(numPartitions)(new RatingBlockBuilder[User, Item]) iter.flatMap { r => val srcBlockId = srcPart.getPartition(r.user) val dstBlockId = dstPart.getPartition(r.item) @@ -580,7 +580,7 @@ private[recommendation] object ALS extends Logging { } } }.groupByKey().mapValues { blocks => - val builder = new RatingBlockBuilder[UserType, ItemType] + val builder = new RatingBlockBuilder[User, Item] blocks.foreach(builder.merge) builder.build() }.setName("ratingBlocks") From e36469ab8e7ee52501ffcd9f4aedce120acaf05c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 29 Jan 2015 11:01:04 -0800 Subject: [PATCH 4/8] add classtags and make it compile --- .../apache/spark/ml/recommendation/ALS.scala | 135 ++++++++++-------- .../spark/ml/recommendation/ALSSuite.scala | 20 +-- 2 files changed, 85 insertions(+), 70 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 9c8716e095d1..ab4be6ad4647 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -20,6 +20,8 @@ package org.apache.spark.ml.recommendation import java.{util => ju} import scala.collection.mutable +import scala.reflect.ClassTag +import scala.util.Sorting import com.github.fommil.netlib.BLAS.{getInstance => blas} import com.github.fommil.netlib.LAPACK.{getInstance => lapack} @@ -29,7 +31,7 @@ import org.apache.spark.{HashPartitioner, Logging, Partitioner} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.param._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.api.scala.dsl._ import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -221,7 +223,8 @@ private[recommendation] object ALS extends Logging { private[recommendation] case class Rating[@specialized(Int, Long) User, @specialized(Int, Long) Item]( user: User, - item: Item, rating: Float) + item: Item, + rating: Float) /** Cholesky solver for least square problems. */ private[recommendation] class CholeskySolver { @@ -333,7 +336,9 @@ private[recommendation] object ALS extends Logging { /** * Implementation of the ALS algorithm. */ - private def train[@specialized(Int, Long) User, @specialized(Int, Long) Item]( + private def train[ + @specialized(Int, Long) User: ClassTag, + @specialized(Int, Long) Item: ClassTag]( ratings: RDD[Rating[User, Item]], rank: Int = 10, numUserBlocks: Int = 10, @@ -341,7 +346,9 @@ private[recommendation] object ALS extends Logging { maxIter: Int = 10, regParam: Double = 1.0, implicitPrefs: Boolean = false, - alpha: Double = 1.0): (RDD[(User, Array[Float])], RDD[(Item, Array[Float])]) = { + alpha: Double = 1.0)( + implicit userOrd: Ordering[User], + itemOrd: Ordering[Item]): (RDD[(User, Array[Float])], RDD[(Item, Array[Float])]) = { val userPart = new HashPartitioner(numUserBlocks) val itemPart = new HashPartitioner(numItemBlocks) val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) @@ -444,8 +451,8 @@ private[recommendation] object ALS extends Logging { * * @see [[LocalIndexEncoder]] */ - private[recommendation] case class InBlock[@specialized(Int, Long) SrcType]( - srcIds: Array[SrcType], + private[recommendation] case class InBlock[@specialized(Int, Long) Src: ClassTag]( + srcIds: Array[Src], dstPtrs: Array[Int], dstEncodedIndices: Array[Int], ratings: Array[Float]) { @@ -463,8 +470,8 @@ private[recommendation] object ALS extends Logging { * @param rank rank * @return initialized factor blocks */ - private def initialize[@specialized(Int, Long) SrcType]( - inBlocks: RDD[(Int, InBlock[SrcType])], + private def initialize[@specialized(Int, Long) Src]( + inBlocks: RDD[(Int, InBlock[Src])], rank: Int): RDD[(Int, FactorBlock)] = { // Choose a unit vector uniformly at random from the unit sphere, but from the // "first quadrant" where all elements are nonnegative. This can be done by choosing @@ -487,9 +494,9 @@ private[recommendation] object ALS extends Logging { * A rating block that contains src IDs, dst IDs, and ratings, stored in primitive arrays. */ private[recommendation] - case class RatingBlock[@specialized(Int, Long) SrcType, @specialized(Int, Long) DstType]( - srcIds: Array[SrcType], - dstIds: Array[DstType], + case class RatingBlock[@specialized(Int, Long) Src, @specialized(Int, Long) Dst]( + srcIds: Array[Src], + dstIds: Array[Dst], ratings: Array[Float]) { /** Size of the block. */ val size: Int = srcIds.size @@ -501,16 +508,16 @@ private[recommendation] object ALS extends Logging { * Builder for [[RatingBlock]]. [[mutable.ArrayBuilder]] is used to avoid boxing/unboxing. */ private[recommendation] class RatingBlockBuilder[ - @specialized(Int, Long) SrcType, - @specialized(Int, Long) DstType] extends Serializable { + @specialized(Int, Long) Src: ClassTag, + @specialized(Int, Long) Dst: ClassTag] extends Serializable { - private val srcIds = mutable.ArrayBuilder.make[SrcType] - private val dstIds = mutable.ArrayBuilder.make[DstType] + private val srcIds = mutable.ArrayBuilder.make[Src] + private val dstIds = mutable.ArrayBuilder.make[Dst] private val ratings = mutable.ArrayBuilder.make[Float] var size = 0 /** Adds a rating. */ - def add(r: Rating[SrcType, DstType]): this.type = { + def add(r: Rating[Src, Dst]): this.type = { size += 1 srcIds += r.user dstIds += r.item @@ -519,7 +526,7 @@ private[recommendation] object ALS extends Logging { } /** Merges another [[RatingBlockBuilder]]. */ - def merge(other: RatingBlock[SrcType, DstType]): this.type = { + def merge(other: RatingBlock[Src, Dst]): this.type = { size += other.srcIds.size srcIds ++= other.srcIds dstIds ++= other.dstIds @@ -528,8 +535,8 @@ private[recommendation] object ALS extends Logging { } /** Builds a [[RatingBlock]]. */ - def build(): RatingBlock[SrcType, DstType] = { - RatingBlock[SrcType, DstType](srcIds.result(), dstIds.result(), ratings.result()) + def build(): RatingBlock[Src, Dst] = { + RatingBlock[Src, Dst](srcIds.result(), dstIds.result(), ratings.result()) } } @@ -542,7 +549,9 @@ private[recommendation] object ALS extends Logging { * * @return an RDD of rating blocks in the form of ((srcBlockId, dstBlockId), ratingBlock) */ - private def partitionRatings[@specialized(Int, Long) User, @specialized(Int, Long) Item]( + private def partitionRatings[ + @specialized(Int, Long) User: ClassTag, + @specialized(Int, Long) Item: ClassTag]( ratings: RDD[Rating[User, Item]], srcPart: Partitioner, dstPart: Partitioner): RDD[((Int, Int), RatingBlock[User, Item])] = { @@ -590,10 +599,11 @@ private[recommendation] object ALS extends Logging { * Builder for uncompressed in-blocks of (srcId, dstEncodedIndex, rating) tuples. * @param encoder encoder for dst indices */ - private[recommendation] class UncompressedInBlockBuilder[@specialized(Int, Long) SrcType]( - encoder: LocalIndexEncoder) { + private[recommendation] class UncompressedInBlockBuilder[@specialized(Int, Long) Src: ClassTag]( + encoder: LocalIndexEncoder)( + implicit ord: Ordering[Src]) { - private val srcIds = mutable.ArrayBuilder.make[SrcType] + private val srcIds = mutable.ArrayBuilder.make[Src] private val dstEncodedIndices = mutable.ArrayBuilder.make[Int] private val ratings = mutable.ArrayBuilder.make[Float] @@ -607,7 +617,7 @@ private[recommendation] object ALS extends Logging { */ def add( dstBlockId: Int, - srcIds: Array[SrcType], + srcIds: Array[Src], dstLocalIndices: Array[Int], ratings: Array[Float]): this.type = { val sz = srcIds.size @@ -624,7 +634,7 @@ private[recommendation] object ALS extends Logging { } /** Builds a [[UncompressedInBlock]]. */ - def build(): UncompressedInBlock[SrcType] = { + def build(): UncompressedInBlock[Src] = { new UncompressedInBlock(srcIds.result(), dstEncodedIndices.result(), ratings.result()) } } @@ -632,10 +642,11 @@ private[recommendation] object ALS extends Logging { /** * A block of (srcId, dstEncodedIndex, rating) tuples stored in primitive arrays. */ - private[recommendation] class UncompressedInBlock[@specialized(Int, Long) SrcType]( - val srcIds: Array[SrcType], + private[recommendation] class UncompressedInBlock[@specialized(Int, Long) Src: ClassTag]( + val srcIds: Array[Src], val dstEncodedIndices: Array[Int], - val ratings: Array[Float]) { + val ratings: Array[Float])( + implicit ord: Ordering[Src]) { /** Size the of block. */ def size: Int = srcIds.size @@ -645,11 +656,11 @@ private[recommendation] object ALS extends Logging { * sparse matrix from coordinate list (COO) format into compressed sparse column (CSC) format. * Sorting is done using Spark's built-in Timsort to avoid generating too many objects. */ - def compress(): InBlock[SrcType] = { + def compress(): InBlock[Src] = { val sz = size assert(sz > 0, "Empty in-link block should not exist.") sort() - val uniqueSrcIdsBuilder = mutable.ArrayBuilder.make[SrcType] + val uniqueSrcIdsBuilder = mutable.ArrayBuilder.make[Src] val dstCountsBuilder = mutable.ArrayBuilder.make[Int] var preSrcId = srcIds(0) uniqueSrcIdsBuilder += preSrcId @@ -689,8 +700,8 @@ private[recommendation] object ALS extends Logging { val sortId = Utils.random.nextInt() logDebug(s"Start sorting an uncompressed in-block of size $sz. (sortId = $sortId)") val start = System.nanoTime() - val sorter = new Sorter(new UncompressedInBlockSort[SrcType]) - sorter.sort(this, 0, size, Ordering[KeyWrapper[SrcType]]) + val sorter = new Sorter(new UncompressedInBlockSort[Src]) + sorter.sort(this, 0, size, Ordering[KeyWrapper[Src]]) val duration = (System.nanoTime() - start) / 1e9 logDebug(s"Sorting took $duration seconds. (sortId = $sortId)") } @@ -701,13 +712,13 @@ private[recommendation] object ALS extends Logging { * * @see [[UncompressedInBlockSort]] */ - private class KeyWrapper[@specialized(Int, Long) KeyType <: Ordered[KeyType]] - extends Ordered[KeyWrapper[KeyType]] { + private class KeyWrapper[@specialized(Int, Long) KeyType: ClassTag]( + implicit ord: Ordering[KeyType]) extends Ordered[KeyWrapper[KeyType]] { - private var key: KeyType = _ + var key: KeyType = _ override def compare(that: KeyWrapper[KeyType]): Int = { - key.compare(that.key) + ord.compare(key, that.key) } def setKey(key: KeyType): this.type = { @@ -719,15 +730,16 @@ private[recommendation] object ALS extends Logging { /** * [[SortDataFormat]] of [[UncompressedInBlock]] used by [[Sorter]]. */ - private class UncompressedInBlockSort[@specialized(Int, Long) SrcType] - extends SortDataFormat[KeyWrapper[SrcType], UncompressedInBlock[SrcType]] { + private class UncompressedInBlockSort[@specialized(Int, Long) Src: ClassTag]( + implicit ord: Ordering[Src]) + extends SortDataFormat[KeyWrapper[Src], UncompressedInBlock[Src]] { - override def newKey(): KeyWrapper[SrcType] = new KeyWrapper() + override def newKey(): KeyWrapper[Src] = new KeyWrapper() override def getKey( - data: UncompressedInBlock[SrcType], + data: UncompressedInBlock[Src], pos: Int, - reuse: KeyWrapper[SrcType]): KeyWrapper[SrcType] = { + reuse: KeyWrapper[Src]): KeyWrapper[Src] = { if (reuse == null) { new KeyWrapper().setKey(data.srcIds(pos)) } else { @@ -736,8 +748,8 @@ private[recommendation] object ALS extends Logging { } override def getKey( - data: UncompressedInBlock[SrcType], - pos: Int): KeyWrapper[SrcType] = { + data: UncompressedInBlock[Src], + pos: Int): KeyWrapper[Src] = { getKey(data, pos, null) } @@ -750,16 +762,16 @@ private[recommendation] object ALS extends Logging { data(pos1) = tmp } - override def swap(data: UncompressedInBlock[SrcType], pos0: Int, pos1: Int): Unit = { + override def swap(data: UncompressedInBlock[Src], pos0: Int, pos1: Int): Unit = { swapElements(data.srcIds, pos0, pos1) swapElements(data.dstEncodedIndices, pos0, pos1) swapElements(data.ratings, pos0, pos1) } override def copyRange( - src: UncompressedInBlock[SrcType], + src: UncompressedInBlock[Src], srcPos: Int, - dst: UncompressedInBlock[SrcType], + dst: UncompressedInBlock[Src], dstPos: Int, length: Int): Unit = { System.arraycopy(src.srcIds, srcPos, dst.srcIds, dstPos, length) @@ -767,15 +779,15 @@ private[recommendation] object ALS extends Logging { System.arraycopy(src.ratings, srcPos, dst.ratings, dstPos, length) } - override def allocate(length: Int): UncompressedInBlock[SrcType] = { + override def allocate(length: Int): UncompressedInBlock[Src] = { new UncompressedInBlock( - new Array[SrcType](length), new Array[Int](length), new Array[Float](length)) + new Array[Src](length), new Array[Int](length), new Array[Float](length)) } override def copyElement( - src: UncompressedInBlock[SrcType], + src: UncompressedInBlock[Src], srcPos: Int, - dst: UncompressedInBlock[SrcType], + dst: UncompressedInBlock[Src], dstPos: Int): Unit = { dst.srcIds(dstPos) = src.srcIds(srcPos) dst.dstEncodedIndices(dstPos) = src.dstEncodedIndices(srcPos) @@ -791,20 +803,23 @@ private[recommendation] object ALS extends Logging { * @param dstPart partitioner for dst IDs * @return (in-blocks, out-blocks) */ - private def makeBlocks[@specialized(Int, Long) SrcType, @specialized(Int, Long) DstType]( + private def makeBlocks[ + @specialized(Int, Long) Src: ClassTag, + @specialized(Int, Long) Dst: ClassTag]( prefix: String, - ratingBlocks: RDD[((Int, Int), RatingBlock[SrcType, DstType])], + ratingBlocks: RDD[((Int, Int), RatingBlock[Src, Dst])], srcPart: Partitioner, dstPart: Partitioner)( - implicit ord: Ordering[DstType]): (RDD[(Int, InBlock[SrcType])], RDD[(Int, OutBlock)]) = { + implicit srcOrd: Ordering[Src], + dstOrd: Ordering[Dst]): (RDD[(Int, InBlock[Src])], RDD[(Int, OutBlock)]) = { val inBlocks = ratingBlocks.map { case ((srcBlockId, dstBlockId), RatingBlock(srcIds, dstIds, ratings)) => // The implementation is a faster version of // val dstIdToLocalIndex = dstIds.toSet.toSeq.sorted.zipWithIndex.toMap val start = System.nanoTime() - val dstIdSet = new OpenHashSet[DstType](1 << 20) + val dstIdSet = new OpenHashSet[Dst](1 << 20) dstIds.foreach(dstIdSet.add) - val sortedDstIds = new Array[DstType](dstIdSet.size) + val sortedDstIds = new Array[Dst](dstIdSet.size) var i = 0 var pos = dstIdSet.nextPos(0) while (pos != -1) { @@ -813,8 +828,8 @@ private[recommendation] object ALS extends Logging { i += 1 } assert(i == dstIdSet.size) - ju.Arrays.sort(sortedDstIds, ord) - val dstIdToLocalIndex = new OpenHashMap[DstType, Int](sortedDstIds.size) + Sorting.quickSort(sortedDstIds) + val dstIdToLocalIndex = new OpenHashMap[Dst, Int](sortedDstIds.size) i = 0 while (i < sortedDstIds.size) { dstIdToLocalIndex.update(sortedDstIds(i), i) @@ -827,7 +842,7 @@ private[recommendation] object ALS extends Logging { }.groupByKey(new HashPartitioner(srcPart.numPartitions)) .mapValues { iter => val builder = - new UncompressedInBlockBuilder[SrcType](new LocalIndexEncoder(dstPart.numPartitions)) + new UncompressedInBlockBuilder[Src](new LocalIndexEncoder(dstPart.numPartitions)) iter.foreach { case (dstBlockId, srcIds, dstLocalIndices, ratings) => builder.add(dstBlockId, srcIds, dstLocalIndices, ratings) } @@ -872,10 +887,10 @@ private[recommendation] object ALS extends Logging { * * @return dst factors */ - private def computeFactors[@specialized(Int, Long) SrcType]( + private def computeFactors[@specialized(Int, Long) Src]( srcFactorBlocks: RDD[(Int, FactorBlock)], srcOutBlocks: RDD[(Int, OutBlock)], - dstInBlocks: RDD[(Int, InBlock[SrcType])], + dstInBlocks: RDD[(Int, InBlock[Src])], rank: Int, regParam: Double, srcEncoder: LocalIndexEncoder, diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 9da253c61d36..cbeba66050f9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -155,7 +155,7 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { } test("RatingBlockBuilder") { - val emptyBuilder = new RatingBlockBuilder() + val emptyBuilder = new RatingBlockBuilder[Int, Int]() assert(emptyBuilder.size === 0) val emptyBlock = emptyBuilder.build() assert(emptyBlock.srcIds.isEmpty) @@ -179,7 +179,7 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { test("UncompressedInBlock") { val encoder = new LocalIndexEncoder(10) - val uncompressed = new UncompressedInBlockBuilder(encoder) + val uncompressed = new UncompressedInBlockBuilder[Int](encoder) .add(0, Array(1, 0, 2), Array(0, 1, 4), Array(1.0f, 2.0f, 3.0f)) .add(1, Array(3, 0), Array(2, 5), Array(4.0f, 5.0f)) .build() @@ -228,15 +228,15 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { numItems: Int, rank: Int, noiseStd: Double = 0.0, - seed: Long = 11L): (RDD[Rating], RDD[Rating]) = { + seed: Long = 11L): (RDD[Rating[Int, Int]], RDD[Rating[Int, Int]]) = { val trainingFraction = 0.6 val testFraction = 0.3 val totalFraction = trainingFraction + testFraction val random = new Random(seed) val userFactors = genFactors(numUsers, rank, random) val itemFactors = genFactors(numItems, rank, random) - val training = ArrayBuffer.empty[Rating] - val test = ArrayBuffer.empty[Rating] + val training = ArrayBuffer.empty[Rating[Int, Int]] + val test = ArrayBuffer.empty[Rating[Int, Int]] for ((userId, userFactor) <- userFactors; (itemId, itemFactor) <- itemFactors) { val x = random.nextDouble() if (x < totalFraction) { @@ -268,7 +268,7 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { numItems: Int, rank: Int, noiseStd: Double = 0.0, - seed: Long = 11L): (RDD[Rating], RDD[Rating]) = { + seed: Long = 11L): (RDD[Rating[Int, Int]], RDD[Rating[Int, Int]]) = { // The assumption of the implicit feedback model is that unobserved ratings are more likely to // be negatives. val positiveFraction = 0.8 @@ -279,8 +279,8 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { val random = new Random(seed) val userFactors = genFactors(numUsers, rank, random) val itemFactors = genFactors(numItems, rank, random) - val training = ArrayBuffer.empty[Rating] - val test = ArrayBuffer.empty[Rating] + val training = ArrayBuffer.empty[Rating[Int, Int]] + val test = ArrayBuffer.empty[Rating[Int, Int]] for ((userId, userFactor) <- userFactors; (itemId, itemFactor) <- itemFactors) { val rating = blas.sdot(rank, userFactor, 1, itemFactor, 1) val threshold = if (rating > 0) positiveFraction else negativeFraction @@ -340,8 +340,8 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { * @param targetRMSE target test RMSE */ def testALS( - training: RDD[Rating], - test: RDD[Rating], + training: RDD[Rating[Int, Int]], + test: RDD[Rating[Int, Int]], rank: Int, maxIter: Int, regParam: Double, From 74f1f73ffbaa3346fb81294e8c8e0eb4653e644c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 29 Jan 2015 12:40:33 -0800 Subject: [PATCH 5/8] compile but runtime error at test --- .../apache/spark/ml/recommendation/ALS.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index ab4be6ad4647..e915c19f2c8b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -217,7 +217,7 @@ class ALS extends Estimator[ALSModel] with ALSParams { } } -private[recommendation] object ALS extends Logging { +object ALS extends Logging { /** Rating class for better code readability. */ private[recommendation] @@ -337,8 +337,8 @@ private[recommendation] object ALS extends Logging { * Implementation of the ALS algorithm. */ private def train[ - @specialized(Int, Long) User: ClassTag, - @specialized(Int, Long) Item: ClassTag]( + User: ClassTag, + Item: ClassTag]( ratings: RDD[Rating[User, Item]], rank: Int = 10, numUserBlocks: Int = 10, @@ -470,7 +470,7 @@ private[recommendation] object ALS extends Logging { * @param rank rank * @return initialized factor blocks */ - private def initialize[@specialized(Int, Long) Src]( + private def initialize[Src]( inBlocks: RDD[(Int, InBlock[Src])], rank: Int): RDD[(Int, FactorBlock)] = { // Choose a unit vector uniformly at random from the unit sphere, but from the @@ -494,7 +494,9 @@ private[recommendation] object ALS extends Logging { * A rating block that contains src IDs, dst IDs, and ratings, stored in primitive arrays. */ private[recommendation] - case class RatingBlock[@specialized(Int, Long) Src, @specialized(Int, Long) Dst]( + case class RatingBlock[ + @specialized(Int, Long) Src, + @specialized(Int, Long) Dst]( srcIds: Array[Src], dstIds: Array[Dst], ratings: Array[Float]) { @@ -550,8 +552,8 @@ private[recommendation] object ALS extends Logging { * @return an RDD of rating blocks in the form of ((srcBlockId, dstBlockId), ratingBlock) */ private def partitionRatings[ - @specialized(Int, Long) User: ClassTag, - @specialized(Int, Long) Item: ClassTag]( + User: ClassTag, + Item: ClassTag]( ratings: RDD[Rating[User, Item]], srcPart: Partitioner, dstPart: Partitioner): RDD[((Int, Int), RatingBlock[User, Item])] = { @@ -804,8 +806,8 @@ private[recommendation] object ALS extends Logging { * @return (in-blocks, out-blocks) */ private def makeBlocks[ - @specialized(Int, Long) Src: ClassTag, - @specialized(Int, Long) Dst: ClassTag]( + Src: ClassTag, + Dst: ClassTag]( prefix: String, ratingBlocks: RDD[((Int, Int), RatingBlock[Src, Dst])], srcPart: Partitioner, @@ -887,7 +889,7 @@ private[recommendation] object ALS extends Logging { * * @return dst factors */ - private def computeFactors[@specialized(Int, Long) Src]( + private def computeFactors[Src]( srcFactorBlocks: RDD[(Int, FactorBlock)], srcOutBlocks: RDD[(Int, OutBlock)], dstInBlocks: RDD[(Int, InBlock[Src])], From 86588e1192a509bf5e94c9fc27ed93f507f5b80d Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 29 Jan 2015 12:50:29 -0800 Subject: [PATCH 6/8] use a single ID type for both users and items --- .../apache/spark/ml/recommendation/ALS.scala | 146 ++++++++---------- .../spark/ml/recommendation/ALSSuite.scala | 18 +-- 2 files changed, 76 insertions(+), 88 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index e915c19f2c8b..f496332b8fb9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -221,9 +221,9 @@ object ALS extends Logging { /** Rating class for better code readability. */ private[recommendation] - case class Rating[@specialized(Int, Long) User, @specialized(Int, Long) Item]( - user: User, - item: Item, + case class Rating[@specialized(Int, Long) ID]( + user: ID, + item: ID, rating: Float) /** Cholesky solver for least square problems. */ @@ -336,10 +336,8 @@ object ALS extends Logging { /** * Implementation of the ALS algorithm. */ - private def train[ - User: ClassTag, - Item: ClassTag]( - ratings: RDD[Rating[User, Item]], + private def train[ID: ClassTag]( + ratings: RDD[Rating[ID]], rank: Int = 10, numUserBlocks: Int = 10, numItemBlocks: Int = 10, @@ -347,8 +345,7 @@ object ALS extends Logging { regParam: Double = 1.0, implicitPrefs: Boolean = false, alpha: Double = 1.0)( - implicit userOrd: Ordering[User], - itemOrd: Ordering[Item]): (RDD[(User, Array[Float])], RDD[(Item, Array[Float])]) = { + implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = { val userPart = new HashPartitioner(numUserBlocks) val itemPart = new HashPartitioner(numItemBlocks) val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) @@ -451,14 +448,13 @@ object ALS extends Logging { * * @see [[LocalIndexEncoder]] */ - private[recommendation] case class InBlock[@specialized(Int, Long) Src: ClassTag]( - srcIds: Array[Src], + private[recommendation] case class InBlock[@specialized(Int, Long) ID: ClassTag]( + srcIds: Array[ID], dstPtrs: Array[Int], dstEncodedIndices: Array[Int], ratings: Array[Float]) { /** Size of the block. */ val size: Int = ratings.size - require(dstEncodedIndices.size == size) require(dstPtrs.size == srcIds.size + 1) } @@ -470,8 +466,8 @@ object ALS extends Logging { * @param rank rank * @return initialized factor blocks */ - private def initialize[Src]( - inBlocks: RDD[(Int, InBlock[Src])], + private def initialize[ID]( + inBlocks: RDD[(Int, InBlock[ID])], rank: Int): RDD[(Int, FactorBlock)] = { // Choose a unit vector uniformly at random from the unit sphere, but from the // "first quadrant" where all elements are nonnegative. This can be done by choosing @@ -494,11 +490,9 @@ object ALS extends Logging { * A rating block that contains src IDs, dst IDs, and ratings, stored in primitive arrays. */ private[recommendation] - case class RatingBlock[ - @specialized(Int, Long) Src, - @specialized(Int, Long) Dst]( - srcIds: Array[Src], - dstIds: Array[Dst], + case class RatingBlock[@specialized(Int, Long) ID]( + srcIds: Array[ID], + dstIds: Array[ID], ratings: Array[Float]) { /** Size of the block. */ val size: Int = srcIds.size @@ -509,17 +503,16 @@ object ALS extends Logging { /** * Builder for [[RatingBlock]]. [[mutable.ArrayBuilder]] is used to avoid boxing/unboxing. */ - private[recommendation] class RatingBlockBuilder[ - @specialized(Int, Long) Src: ClassTag, - @specialized(Int, Long) Dst: ClassTag] extends Serializable { + private[recommendation] class RatingBlockBuilder[@specialized(Int, Long) ID: ClassTag] + extends Serializable { - private val srcIds = mutable.ArrayBuilder.make[Src] - private val dstIds = mutable.ArrayBuilder.make[Dst] + private val srcIds = mutable.ArrayBuilder.make[ID] + private val dstIds = mutable.ArrayBuilder.make[ID] private val ratings = mutable.ArrayBuilder.make[Float] var size = 0 /** Adds a rating. */ - def add(r: Rating[Src, Dst]): this.type = { + def add(r: Rating[ID]): this.type = { size += 1 srcIds += r.user dstIds += r.item @@ -528,7 +521,7 @@ object ALS extends Logging { } /** Merges another [[RatingBlockBuilder]]. */ - def merge(other: RatingBlock[Src, Dst]): this.type = { + def merge(other: RatingBlock[ID]): this.type = { size += other.srcIds.size srcIds ++= other.srcIds dstIds ++= other.dstIds @@ -537,8 +530,8 @@ object ALS extends Logging { } /** Builds a [[RatingBlock]]. */ - def build(): RatingBlock[Src, Dst] = { - RatingBlock[Src, Dst](srcIds.result(), dstIds.result(), ratings.result()) + def build(): RatingBlock[ID] = { + RatingBlock[ID](srcIds.result(), dstIds.result(), ratings.result()) } } @@ -551,12 +544,10 @@ object ALS extends Logging { * * @return an RDD of rating blocks in the form of ((srcBlockId, dstBlockId), ratingBlock) */ - private def partitionRatings[ - User: ClassTag, - Item: ClassTag]( - ratings: RDD[Rating[User, Item]], + private def partitionRatings[ID: ClassTag]( + ratings: RDD[Rating[ID]], srcPart: Partitioner, - dstPart: Partitioner): RDD[((Int, Int), RatingBlock[User, Item])] = { + dstPart: Partitioner): RDD[((Int, Int), RatingBlock[ID])] = { /* The implementation produces the same result as the following but generates less objects. @@ -570,7 +561,7 @@ object ALS extends Logging { val numPartitions = srcPart.numPartitions * dstPart.numPartitions ratings.mapPartitions { iter => - val builders = Array.fill(numPartitions)(new RatingBlockBuilder[User, Item]) + val builders = Array.fill(numPartitions)(new RatingBlockBuilder[ID]) iter.flatMap { r => val srcBlockId = srcPart.getPartition(r.user) val dstBlockId = dstPart.getPartition(r.item) @@ -591,7 +582,7 @@ object ALS extends Logging { } } }.groupByKey().mapValues { blocks => - val builder = new RatingBlockBuilder[User, Item] + val builder = new RatingBlockBuilder[ID] blocks.foreach(builder.merge) builder.build() }.setName("ratingBlocks") @@ -601,11 +592,11 @@ object ALS extends Logging { * Builder for uncompressed in-blocks of (srcId, dstEncodedIndex, rating) tuples. * @param encoder encoder for dst indices */ - private[recommendation] class UncompressedInBlockBuilder[@specialized(Int, Long) Src: ClassTag]( + private[recommendation] class UncompressedInBlockBuilder[@specialized(Int, Long) ID: ClassTag]( encoder: LocalIndexEncoder)( - implicit ord: Ordering[Src]) { + implicit ord: Ordering[ID]) { - private val srcIds = mutable.ArrayBuilder.make[Src] + private val srcIds = mutable.ArrayBuilder.make[ID] private val dstEncodedIndices = mutable.ArrayBuilder.make[Int] private val ratings = mutable.ArrayBuilder.make[Float] @@ -619,7 +610,7 @@ object ALS extends Logging { */ def add( dstBlockId: Int, - srcIds: Array[Src], + srcIds: Array[ID], dstLocalIndices: Array[Int], ratings: Array[Float]): this.type = { val sz = srcIds.size @@ -636,7 +627,7 @@ object ALS extends Logging { } /** Builds a [[UncompressedInBlock]]. */ - def build(): UncompressedInBlock[Src] = { + def build(): UncompressedInBlock[ID] = { new UncompressedInBlock(srcIds.result(), dstEncodedIndices.result(), ratings.result()) } } @@ -644,11 +635,11 @@ object ALS extends Logging { /** * A block of (srcId, dstEncodedIndex, rating) tuples stored in primitive arrays. */ - private[recommendation] class UncompressedInBlock[@specialized(Int, Long) Src: ClassTag]( - val srcIds: Array[Src], + private[recommendation] class UncompressedInBlock[@specialized(Int, Long) ID: ClassTag]( + val srcIds: Array[ID], val dstEncodedIndices: Array[Int], val ratings: Array[Float])( - implicit ord: Ordering[Src]) { + implicit ord: Ordering[ID]) { /** Size the of block. */ def size: Int = srcIds.size @@ -658,11 +649,11 @@ object ALS extends Logging { * sparse matrix from coordinate list (COO) format into compressed sparse column (CSC) format. * Sorting is done using Spark's built-in Timsort to avoid generating too many objects. */ - def compress(): InBlock[Src] = { + def compress(): InBlock[ID] = { val sz = size assert(sz > 0, "Empty in-link block should not exist.") sort() - val uniqueSrcIdsBuilder = mutable.ArrayBuilder.make[Src] + val uniqueSrcIdsBuilder = mutable.ArrayBuilder.make[ID] val dstCountsBuilder = mutable.ArrayBuilder.make[Int] var preSrcId = srcIds(0) uniqueSrcIdsBuilder += preSrcId @@ -702,8 +693,8 @@ object ALS extends Logging { val sortId = Utils.random.nextInt() logDebug(s"Start sorting an uncompressed in-block of size $sz. (sortId = $sortId)") val start = System.nanoTime() - val sorter = new Sorter(new UncompressedInBlockSort[Src]) - sorter.sort(this, 0, size, Ordering[KeyWrapper[Src]]) + val sorter = new Sorter(new UncompressedInBlockSort[ID]) + sorter.sort(this, 0, size, Ordering[KeyWrapper[ID]]) val duration = (System.nanoTime() - start) / 1e9 logDebug(s"Sorting took $duration seconds. (sortId = $sortId)") } @@ -714,16 +705,16 @@ object ALS extends Logging { * * @see [[UncompressedInBlockSort]] */ - private class KeyWrapper[@specialized(Int, Long) KeyType: ClassTag]( - implicit ord: Ordering[KeyType]) extends Ordered[KeyWrapper[KeyType]] { + private class KeyWrapper[@specialized(Int, Long) ID: ClassTag]( + implicit ord: Ordering[ID]) extends Ordered[KeyWrapper[ID]] { - var key: KeyType = _ + var key: ID = _ - override def compare(that: KeyWrapper[KeyType]): Int = { + override def compare(that: KeyWrapper[ID]): Int = { ord.compare(key, that.key) } - def setKey(key: KeyType): this.type = { + def setKey(key: ID): this.type = { this.key = key this } @@ -732,16 +723,16 @@ object ALS extends Logging { /** * [[SortDataFormat]] of [[UncompressedInBlock]] used by [[Sorter]]. */ - private class UncompressedInBlockSort[@specialized(Int, Long) Src: ClassTag]( - implicit ord: Ordering[Src]) - extends SortDataFormat[KeyWrapper[Src], UncompressedInBlock[Src]] { + private class UncompressedInBlockSort[@specialized(Int, Long) ID: ClassTag]( + implicit ord: Ordering[ID]) + extends SortDataFormat[KeyWrapper[ID], UncompressedInBlock[ID]] { - override def newKey(): KeyWrapper[Src] = new KeyWrapper() + override def newKey(): KeyWrapper[ID] = new KeyWrapper() override def getKey( - data: UncompressedInBlock[Src], + data: UncompressedInBlock[ID], pos: Int, - reuse: KeyWrapper[Src]): KeyWrapper[Src] = { + reuse: KeyWrapper[ID]): KeyWrapper[ID] = { if (reuse == null) { new KeyWrapper().setKey(data.srcIds(pos)) } else { @@ -750,8 +741,8 @@ object ALS extends Logging { } override def getKey( - data: UncompressedInBlock[Src], - pos: Int): KeyWrapper[Src] = { + data: UncompressedInBlock[ID], + pos: Int): KeyWrapper[ID] = { getKey(data, pos, null) } @@ -764,16 +755,16 @@ object ALS extends Logging { data(pos1) = tmp } - override def swap(data: UncompressedInBlock[Src], pos0: Int, pos1: Int): Unit = { + override def swap(data: UncompressedInBlock[ID], pos0: Int, pos1: Int): Unit = { swapElements(data.srcIds, pos0, pos1) swapElements(data.dstEncodedIndices, pos0, pos1) swapElements(data.ratings, pos0, pos1) } override def copyRange( - src: UncompressedInBlock[Src], + src: UncompressedInBlock[ID], srcPos: Int, - dst: UncompressedInBlock[Src], + dst: UncompressedInBlock[ID], dstPos: Int, length: Int): Unit = { System.arraycopy(src.srcIds, srcPos, dst.srcIds, dstPos, length) @@ -781,15 +772,15 @@ object ALS extends Logging { System.arraycopy(src.ratings, srcPos, dst.ratings, dstPos, length) } - override def allocate(length: Int): UncompressedInBlock[Src] = { + override def allocate(length: Int): UncompressedInBlock[ID] = { new UncompressedInBlock( - new Array[Src](length), new Array[Int](length), new Array[Float](length)) + new Array[ID](length), new Array[Int](length), new Array[Float](length)) } override def copyElement( - src: UncompressedInBlock[Src], + src: UncompressedInBlock[ID], srcPos: Int, - dst: UncompressedInBlock[Src], + dst: UncompressedInBlock[ID], dstPos: Int): Unit = { dst.srcIds(dstPos) = src.srcIds(srcPos) dst.dstEncodedIndices(dstPos) = src.dstEncodedIndices(srcPos) @@ -805,23 +796,20 @@ object ALS extends Logging { * @param dstPart partitioner for dst IDs * @return (in-blocks, out-blocks) */ - private def makeBlocks[ - Src: ClassTag, - Dst: ClassTag]( + private def makeBlocks[ID: ClassTag]( prefix: String, - ratingBlocks: RDD[((Int, Int), RatingBlock[Src, Dst])], + ratingBlocks: RDD[((Int, Int), RatingBlock[ID])], srcPart: Partitioner, dstPart: Partitioner)( - implicit srcOrd: Ordering[Src], - dstOrd: Ordering[Dst]): (RDD[(Int, InBlock[Src])], RDD[(Int, OutBlock)]) = { + implicit srcOrd: Ordering[ID]): (RDD[(Int, InBlock[ID])], RDD[(Int, OutBlock)]) = { val inBlocks = ratingBlocks.map { case ((srcBlockId, dstBlockId), RatingBlock(srcIds, dstIds, ratings)) => // The implementation is a faster version of // val dstIdToLocalIndex = dstIds.toSet.toSeq.sorted.zipWithIndex.toMap val start = System.nanoTime() - val dstIdSet = new OpenHashSet[Dst](1 << 20) + val dstIdSet = new OpenHashSet[ID](1 << 20) dstIds.foreach(dstIdSet.add) - val sortedDstIds = new Array[Dst](dstIdSet.size) + val sortedDstIds = new Array[ID](dstIdSet.size) var i = 0 var pos = dstIdSet.nextPos(0) while (pos != -1) { @@ -831,7 +819,7 @@ object ALS extends Logging { } assert(i == dstIdSet.size) Sorting.quickSort(sortedDstIds) - val dstIdToLocalIndex = new OpenHashMap[Dst, Int](sortedDstIds.size) + val dstIdToLocalIndex = new OpenHashMap[ID, Int](sortedDstIds.size) i = 0 while (i < sortedDstIds.size) { dstIdToLocalIndex.update(sortedDstIds(i), i) @@ -844,7 +832,7 @@ object ALS extends Logging { }.groupByKey(new HashPartitioner(srcPart.numPartitions)) .mapValues { iter => val builder = - new UncompressedInBlockBuilder[Src](new LocalIndexEncoder(dstPart.numPartitions)) + new UncompressedInBlockBuilder[ID](new LocalIndexEncoder(dstPart.numPartitions)) iter.foreach { case (dstBlockId, srcIds, dstLocalIndices, ratings) => builder.add(dstBlockId, srcIds, dstLocalIndices, ratings) } @@ -889,10 +877,10 @@ object ALS extends Logging { * * @return dst factors */ - private def computeFactors[Src]( + private def computeFactors[ID]( srcFactorBlocks: RDD[(Int, FactorBlock)], srcOutBlocks: RDD[(Int, OutBlock)], - dstInBlocks: RDD[(Int, InBlock[Src])], + dstInBlocks: RDD[(Int, InBlock[ID])], rank: Int, regParam: Double, srcEncoder: LocalIndexEncoder, diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index cbeba66050f9..79abc5a713ff 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -155,7 +155,7 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { } test("RatingBlockBuilder") { - val emptyBuilder = new RatingBlockBuilder[Int, Int]() + val emptyBuilder = new RatingBlockBuilder[Int]() assert(emptyBuilder.size === 0) val emptyBlock = emptyBuilder.build() assert(emptyBlock.srcIds.isEmpty) @@ -228,15 +228,15 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { numItems: Int, rank: Int, noiseStd: Double = 0.0, - seed: Long = 11L): (RDD[Rating[Int, Int]], RDD[Rating[Int, Int]]) = { + seed: Long = 11L): (RDD[Rating[Int]], RDD[Rating[Int]]) = { val trainingFraction = 0.6 val testFraction = 0.3 val totalFraction = trainingFraction + testFraction val random = new Random(seed) val userFactors = genFactors(numUsers, rank, random) val itemFactors = genFactors(numItems, rank, random) - val training = ArrayBuffer.empty[Rating[Int, Int]] - val test = ArrayBuffer.empty[Rating[Int, Int]] + val training = ArrayBuffer.empty[Rating[Int]] + val test = ArrayBuffer.empty[Rating[Int]] for ((userId, userFactor) <- userFactors; (itemId, itemFactor) <- itemFactors) { val x = random.nextDouble() if (x < totalFraction) { @@ -268,7 +268,7 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { numItems: Int, rank: Int, noiseStd: Double = 0.0, - seed: Long = 11L): (RDD[Rating[Int, Int]], RDD[Rating[Int, Int]]) = { + seed: Long = 11L): (RDD[Rating[Int]], RDD[Rating[Int]]) = { // The assumption of the implicit feedback model is that unobserved ratings are more likely to // be negatives. val positiveFraction = 0.8 @@ -279,8 +279,8 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { val random = new Random(seed) val userFactors = genFactors(numUsers, rank, random) val itemFactors = genFactors(numItems, rank, random) - val training = ArrayBuffer.empty[Rating[Int, Int]] - val test = ArrayBuffer.empty[Rating[Int, Int]] + val training = ArrayBuffer.empty[Rating[Int]] + val test = ArrayBuffer.empty[Rating[Int]] for ((userId, userFactor) <- userFactors; (itemId, itemFactor) <- itemFactors) { val rating = blas.sdot(rank, userFactor, 1, itemFactor, 1) val threshold = if (rating > 0) positiveFraction else negativeFraction @@ -340,8 +340,8 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { * @param targetRMSE target test RMSE */ def testALS( - training: RDD[Rating[Int, Int]], - test: RDD[Rating[Int, Int]], + training: RDD[Rating[Int]], + test: RDD[Rating[Int]], rank: Int, maxIter: Int, regParam: Double, From c2db5e512bc9550b0302c5a18bb0dbfe7ef03988 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 29 Jan 2015 15:34:45 -0800 Subject: [PATCH 7/8] make test pass --- .../apache/spark/ml/recommendation/ALS.scala | 63 +++++++++---------- .../spark/ml/recommendation/ALSSuite.scala | 4 +- 2 files changed, 33 insertions(+), 34 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index f496332b8fb9..e7dc42fc1dc4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -291,7 +291,7 @@ object ALS extends Logging { /** Adds an observation. */ def add(a: Array[Float], b: Float): this.type = { - require(a.size == k) + require(a.length == k) copyToDouble(a) blas.dspr(upper, k, 1.0, da, 1, ata) blas.daxpy(k, b.toDouble, da, 1, atb, 1) @@ -303,7 +303,7 @@ object ALS extends Logging { * Adds an observation with implicit feedback. Note that this does not increment the counter. */ def addImplicit(a: Array[Float], b: Float, alpha: Double): this.type = { - require(a.size == k) + require(a.length == k) // Extension to the original paper to handle b < 0. confidence is a function of |b| instead // so that it is never negative. val confidence = 1.0 + alpha * math.abs(b) @@ -319,8 +319,8 @@ object ALS extends Logging { /** Merges another normal equation object. */ def merge(other: NormalEquation): this.type = { require(other.k == k) - blas.daxpy(ata.size, 1.0, other.ata, 1, ata, 1) - blas.daxpy(atb.size, 1.0, other.atb, 1, atb, 1) + blas.daxpy(ata.length, 1.0, other.ata, 1, ata, 1) + blas.daxpy(atb.length, 1.0, other.atb, 1, atb, 1) n += other.n this } @@ -454,9 +454,9 @@ object ALS extends Logging { dstEncodedIndices: Array[Int], ratings: Array[Float]) { /** Size of the block. */ - val size: Int = ratings.size - require(dstEncodedIndices.size == size) - require(dstPtrs.size == srcIds.size + 1) + def size: Int = ratings.length + require(dstEncodedIndices.length == size) + require(dstPtrs.length == srcIds.length + 1) } /** @@ -476,7 +476,7 @@ object ALS extends Logging { // (<1%) compared picking elements uniformly at random in [0,1]. inBlocks.map { case (srcBlockId, inBlock) => val random = new XORShiftRandom(srcBlockId) - val factors = Array.fill(inBlock.srcIds.size) { + val factors = Array.fill(inBlock.srcIds.length) { val factor = Array.fill(rank)(random.nextGaussian().toFloat) val nrm = blas.snrm2(rank, factor, 1) blas.sscal(rank, 1.0f / nrm, factor, 1) @@ -489,15 +489,14 @@ object ALS extends Logging { /** * A rating block that contains src IDs, dst IDs, and ratings, stored in primitive arrays. */ - private[recommendation] - case class RatingBlock[@specialized(Int, Long) ID]( + case class RatingBlock[@specialized(Int, Long) ID: ClassTag]( srcIds: Array[ID], dstIds: Array[ID], ratings: Array[Float]) { /** Size of the block. */ - val size: Int = srcIds.size - require(dstIds.size == size) - require(ratings.size == size) + def size: Int = srcIds.length + require(dstIds.length == srcIds.length) + require(ratings.length == srcIds.length) } /** @@ -522,7 +521,7 @@ object ALS extends Logging { /** Merges another [[RatingBlockBuilder]]. */ def merge(other: RatingBlock[ID]): this.type = { - size += other.srcIds.size + size += other.srcIds.length srcIds ++= other.srcIds dstIds ++= other.dstIds ratings ++= other.ratings @@ -531,7 +530,7 @@ object ALS extends Logging { /** Builds a [[RatingBlock]]. */ def build(): RatingBlock[ID] = { - RatingBlock[ID](srcIds.result(), dstIds.result(), ratings.result()) + new RatingBlock[ID](srcIds.result(), dstIds.result(), ratings.result()) } } @@ -568,14 +567,14 @@ object ALS extends Logging { val idx = srcBlockId + srcPart.numPartitions * dstBlockId val builder = builders(idx) builder.add(r) - if (builder.size >= 2048) { // 2048 * (3 * 4) = 24k + if (builder.length >= 2048) { // 2048 * (3 * 4) = 24k builders(idx) = new RatingBlockBuilder Iterator.single(((srcBlockId, dstBlockId), builder.build())) } else { Iterator.empty } } ++ { - builders.view.zipWithIndex.filter(_._1.size > 0).map { case (block, idx) => + builders.view.zipWithIndex.filter(_._1.length > 0).map { case (block, idx) => val srcBlockId = idx % srcPart.numPartitions val dstBlockId = idx / srcPart.numPartitions ((srcBlockId, dstBlockId), block.build()) @@ -613,9 +612,9 @@ object ALS extends Logging { srcIds: Array[ID], dstLocalIndices: Array[Int], ratings: Array[Float]): this.type = { - val sz = srcIds.size - require(dstLocalIndices.size == sz) - require(ratings.size == sz) + val sz = srcIds.length + require(dstLocalIndices.length == sz) + require(ratings.length == sz) this.srcIds ++= srcIds this.ratings ++= ratings var j = 0 @@ -642,7 +641,7 @@ object ALS extends Logging { implicit ord: Ordering[ID]) { /** Size the of block. */ - def size: Int = srcIds.size + def length: Int = srcIds.length /** * Compresses the block into an [[InBlock]]. The algorithm is the same as converting a @@ -650,7 +649,7 @@ object ALS extends Logging { * Sorting is done using Spark's built-in Timsort to avoid generating too many objects. */ def compress(): InBlock[ID] = { - val sz = size + val sz = length assert(sz > 0, "Empty in-link block should not exist.") sort() val uniqueSrcIdsBuilder = mutable.ArrayBuilder.make[ID] @@ -674,7 +673,7 @@ object ALS extends Logging { } dstCountsBuilder += curCount val uniqueSrcIds = uniqueSrcIdsBuilder.result() - val numUniqueSrdIds = uniqueSrcIds.size + val numUniqueSrdIds = uniqueSrcIds.length val dstCounts = dstCountsBuilder.result() val dstPtrs = new Array[Int](numUniqueSrdIds + 1) var sum = 0 @@ -688,13 +687,13 @@ object ALS extends Logging { } private def sort(): Unit = { - val sz = size + val sz = length // Since there might be interleaved log messages, we insert a unique id for easy pairing. val sortId = Utils.random.nextInt() logDebug(s"Start sorting an uncompressed in-block of size $sz. (sortId = $sortId)") val start = System.nanoTime() val sorter = new Sorter(new UncompressedInBlockSort[ID]) - sorter.sort(this, 0, size, Ordering[KeyWrapper[ID]]) + sorter.sort(this, 0, length, Ordering[KeyWrapper[ID]]) val duration = (System.nanoTime() - start) / 1e9 logDebug(s"Sorting took $duration seconds. (sortId = $sortId)") } @@ -819,9 +818,9 @@ object ALS extends Logging { } assert(i == dstIdSet.size) Sorting.quickSort(sortedDstIds) - val dstIdToLocalIndex = new OpenHashMap[ID, Int](sortedDstIds.size) + val dstIdToLocalIndex = new OpenHashMap[ID, Int](sortedDstIds.length) i = 0 - while (i < sortedDstIds.size) { + while (i < sortedDstIds.length) { dstIdToLocalIndex.update(sortedDstIds(i), i) i += 1 } @@ -843,7 +842,7 @@ object ALS extends Logging { val activeIds = Array.fill(dstPart.numPartitions)(mutable.ArrayBuilder.make[Int]) var i = 0 val seen = new Array[Boolean](dstPart.numPartitions) - while (i < srcIds.size) { + while (i < srcIds.length) { var j = dstPtrs(i) ju.Arrays.fill(seen, false) while (j < dstPtrs(i + 1)) { @@ -886,7 +885,7 @@ object ALS extends Logging { srcEncoder: LocalIndexEncoder, implicitPrefs: Boolean = false, alpha: Double = 1.0): RDD[(Int, FactorBlock)] = { - val numSrcBlocks = srcFactorBlocks.partitions.size + val numSrcBlocks = srcFactorBlocks.partitions.length val YtY = if (implicitPrefs) Some(computeYtY(srcFactorBlocks, rank)) else None val srcOut = srcOutBlocks.join(srcFactorBlocks).flatMap { case (srcBlockId, (srcOutBlock, srcFactors)) => @@ -894,18 +893,18 @@ object ALS extends Logging { (dstBlockId, (srcBlockId, activeIndices.map(idx => srcFactors(idx)))) } } - val merged = srcOut.groupByKey(new HashPartitioner(dstInBlocks.partitions.size)) + val merged = srcOut.groupByKey(new HashPartitioner(dstInBlocks.partitions.length)) dstInBlocks.join(merged).mapValues { case (InBlock(dstIds, srcPtrs, srcEncodedIndices, ratings), srcFactors) => val sortedSrcFactors = new Array[FactorBlock](numSrcBlocks) srcFactors.foreach { case (srcBlockId, factors) => sortedSrcFactors(srcBlockId) = factors } - val dstFactors = new Array[Array[Float]](dstIds.size) + val dstFactors = new Array[Array[Float]](dstIds.length) var j = 0 val ls = new NormalEquation(rank) val solver = new CholeskySolver // TODO: add NNLS solver - while (j < dstIds.size) { + while (j < dstIds.length) { ls.reset() if (implicitPrefs) { ls.merge(YtY.get) diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 79abc5a713ff..248fedbb17da 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -183,8 +183,8 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { .add(0, Array(1, 0, 2), Array(0, 1, 4), Array(1.0f, 2.0f, 3.0f)) .add(1, Array(3, 0), Array(2, 5), Array(4.0f, 5.0f)) .build() - assert(uncompressed.size === 5) - val records = Seq.tabulate(uncompressed.size) { i => + assert(uncompressed.length === 5) + val records = Seq.tabulate(uncompressed.length) { i => val dstEncodedIndex = uncompressed.dstEncodedIndices(i) val dstBlockId = encoder.blockId(dstEncodedIndex) val dstLocalIndex = encoder.localIndex(dstEncodedIndex) From 135f7417022b34c37c4b56f0e44c943244962d11 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 29 Jan 2015 16:16:52 -0800 Subject: [PATCH 8/8] minor update --- .../apache/spark/ml/recommendation/ALS.scala | 26 ++++++++++++------- .../spark/ml/recommendation/ALSSuite.scala | 12 +++++++++ 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index e7dc42fc1dc4..c48afe39e3e2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -28,6 +28,7 @@ import com.github.fommil.netlib.LAPACK.{getInstance => lapack} import org.netlib.util.intW import org.apache.spark.{HashPartitioner, Logging, Partitioner} +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.param._ import org.apache.spark.rdd.RDD @@ -201,7 +202,7 @@ class ALS extends Estimator[ALSModel] with ALSParams { val ratings = dataset .select(col(map(userCol)), col(map(itemCol)), col(map(ratingCol)).cast(FloatType)) .map { row => - new Rating(row.getInt(0), row.getInt(1), row.getFloat(2)) + Rating(row.getInt(0), row.getInt(1), row.getFloat(2)) } val (userFactors, itemFactors) = ALS.train(ratings, rank = map(rank), numUserBlocks = map(numUserBlocks), numItemBlocks = map(numItemBlocks), @@ -217,14 +218,19 @@ class ALS extends Estimator[ALSModel] with ALSParams { } } +/** + * :: DeveloperApi :: + * An implementation of ALS that supports generic ID types, specialized for Int and Long. This is + * exposed as a developer API for users who do need other ID types. But it is not recommended + * because it increases the shuffle size and memory requirement during training. For simplicity, + * users and items must have the same type. The number of distinct users/items should be smaller + * than 2 billion. + */ +@DeveloperApi object ALS extends Logging { /** Rating class for better code readability. */ - private[recommendation] - case class Rating[@specialized(Int, Long) ID]( - user: ID, - item: ID, - rating: Float) + case class Rating[@specialized(Int, Long) ID](user: ID, item: ID, rating: Float) /** Cholesky solver for least square problems. */ private[recommendation] class CholeskySolver { @@ -336,7 +342,7 @@ object ALS extends Logging { /** * Implementation of the ALS algorithm. */ - private def train[ID: ClassTag]( + def train[ID: ClassTag]( ratings: RDD[Rating[ID]], rank: Int = 10, numUserBlocks: Int = 10, @@ -489,7 +495,7 @@ object ALS extends Logging { /** * A rating block that contains src IDs, dst IDs, and ratings, stored in primitive arrays. */ - case class RatingBlock[@specialized(Int, Long) ID: ClassTag]( + private[recommendation] case class RatingBlock[@specialized(Int, Long) ID: ClassTag]( srcIds: Array[ID], dstIds: Array[ID], ratings: Array[Float]) { @@ -567,14 +573,14 @@ object ALS extends Logging { val idx = srcBlockId + srcPart.numPartitions * dstBlockId val builder = builders(idx) builder.add(r) - if (builder.length >= 2048) { // 2048 * (3 * 4) = 24k + if (builder.size >= 2048) { // 2048 * (3 * 4) = 24k builders(idx) = new RatingBlockBuilder Iterator.single(((srcBlockId, dstBlockId), builder.build())) } else { Iterator.empty } } ++ { - builders.view.zipWithIndex.filter(_._1.length > 0).map { case (block, idx) => + builders.view.zipWithIndex.filter(_._1.size > 0).map { case (block, idx) => val srcBlockId = idx % srcPart.numPartitions val dstBlockId = idx / srcPart.numPartitions ((srcBlockId, dstBlockId), block.build()) diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 248fedbb17da..07aff56fb7d2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -432,4 +432,16 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { testALS(training, test, maxIter = 4, rank = 2, regParam = 0.01, implicitPrefs = true, targetRMSE = 0.3) } + + test("using generic ID types") { + val (ratings, _) = genImplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01) + + val longRatings = ratings.map(r => Rating(r.user.toLong, r.item.toLong, r.rating)) + val (longUserFactors, _) = ALS.train(longRatings, rank = 2, maxIter = 4) + assert(longUserFactors.first()._1.getClass === classOf[Long]) + + val strRatings = ratings.map(r => Rating(r.user.toString, r.item.toString, r.rating)) + val (strUserFactors, _) = ALS.train(strRatings, rank = 2, maxIter = 4) + assert(strUserFactors.first()._1.getClass === classOf[String]) + } }