Skip to content

Conversation

@mpjlu
Copy link

@mpjlu mpjlu commented Apr 24, 2017

What changes were proposed in this pull request?

The recommendForAll of MLLIB ALS is very slow.
GC is a key problem of the current method.
The task use the following code to keep temp result:
val output = new Array(Int, (Int, Double))
m = n = 4096 (default value, no method to set)
so output is about 4k * 4k * (4 + 4 + 8) = 256M. This is a large memory and cause serious GC problem, and it is frequently OOM.

Actually, we don't need to save all the temp result. Support we recommend topK (topK is about 10, or 20) product for each user, we only need 4k * topK * (4 + 4 + 8) memory to save the temp result.

The Test Environment:
3 workers: each work 10 core, each work 30G memory, each work 1 executor.
The Data: User 480,000, and Item 17,000

BlockSize: 1024 2048 4096 8192
Old method: 245s 332s 488s OOM
This solution: 121s 118s 117s 120s

How was this patch tested?

The existing UT.

@SparkQA
Copy link

SparkQA commented Apr 24, 2017

Test build #76100 has finished for PR 17742 at commit 14cdbf6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MLnick
Copy link
Contributor

MLnick commented Apr 24, 2017

Interesting - I was working on something very similar - a rough draft of it is in a branch.

@SparkQA
Copy link

SparkQA commented Apr 24, 2017

Test build #76111 has finished for PR 17742 at commit f607e6c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mpjlu mpjlu changed the title [Spark-20446][ML][MLLIB]Optimize MLLIB ALS recommendForAll [Spark-11968][ML][MLLIB]Optimize MLLIB ALS recommendForAll Apr 25, 2017
@MLnick
Copy link
Contributor

MLnick commented Apr 26, 2017

Could you post updated performance numbers?

I think we can do the same optimization in ml version. We could do it in a follow up PR perhaps? Though I am ok with putting it in this PR too.

k += 1
}
output.toSeq
case (users, items) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put case statement on previous line: flatMap { case (... =>

k += 1
}
output.toSeq
case (users, items) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer case (srcIter, dstIter) rather than users / items (as they can be swapped depending on which recommendation method is being called).

val n = math.min(items.size, num)
val output = new Array[(Int, (Int, Double))](m * n)
var j = 0
users.foreach (user => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

srcIter.foreach { case (srcId, srcFactor) =>

def order(a: (Int, Double)) = a._2
val pq: BoundedPriorityQueue[(Int, Double)] =
new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(order))
items.foreach (item => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here: dstIter.foreach { case (dstId, dstFactor) =>

var rate: Double = 0
var k = 0
while(k < rank) {
rate += user._2(k) * item._2(k)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can have rate += srcFactor(k) * dstFactor(k)

Also, can we call it score or prediction rather than rate?

val n = math.min(items.size, num)
val output = new Array[(Int, (Int, Double))](m * n)
var j = 0
users.foreach (user => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be performance benefit to using while loop here vs foreach?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will test while here, thanks.

*/
var rate: Double = 0
var k = 0
while(k < rank) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space here: while (

})
val pqIter = pq.iterator
var i = 0
while(i < n) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space here: while (

rate += user._2(k) * item._2(k)
k += 1
}
pq += ((item._1, rate))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can then use dstFactor instead

val pqIter = pq.iterator
var i = 0
while(i < n) {
output(j + i) = (user._1, pqIter.next())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here srcFactor instead

@SparkQA
Copy link

SparkQA commented Apr 26, 2017

Test build #76176 has finished for PR 17742 at commit ae03124.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mpjlu
Copy link
Author

mpjlu commented Apr 26, 2017

Hi @MLnick , The new test results are:
3 worker, each 10 cores, each 30G memory, each 1 executor.
Data Size: user 3,290,000, item 200,000.
recommendProductsForUsers with blockSize 4096 is about 37min,

@mpjlu
Copy link
Author

mpjlu commented Apr 26, 2017

Another case
3 workers, each 40 cores, each 196G memory, each 1 executor.
Data Size: user 480,000, item 17,000
recommendProductsForUsers with blockSize 4096 is about 34s

@mpjlu
Copy link
Author

mpjlu commented Apr 26, 2017

Thanks very much @MLnick .
I am doing more test about mllib solution. When it is solid enough, then we can submit a follow up PR for ML optimization. How do you think about it?

@SparkQA
Copy link

SparkQA commented Apr 26, 2017

Test build #76180 has finished for PR 17742 at commit 2fd97a0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MLnick
Copy link
Contributor

MLnick commented Apr 27, 2017

@mpjlu yeah we can do the ML version in a follow up PR that is ok (I can help if needed).

users.foreach (user => {
srcIter.foreach { case (srcId, srcFactor) =>
def order(a: (Int, Double)) = a._2
val pq: BoundedPriorityQueue[(Int, Double)] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could remove the type sig from the val definition here to make it fit on one line

srcIter.foreach { case (srcId, srcFactor) =>
def order(a: (Int, Double)) = a._2
val pq: BoundedPriorityQueue[(Int, Double)] =
new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(order))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can just do Ordering.by(_._2) without needing to define def order(... above

* blas.ddot (F2jBLAS) is the same performance with the following code.
* the performace of blas.ddot with NativeBLAS is very bad.
* blas.ddot (F2jBLAS) is about 10% improvement comparing with linalg.dot.
* val rate = blas.ddot(rank, user._2, 1, item._2, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can perhaps say here instead "The below code is equivalent to val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)"


/**
* Blockifies features to use Level-3 BLAS.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should adjust the comment here as we're not using Level-3 BLAS any more.

k += 1
}
output.toSeq
val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to more detail to the doc string comment for this method to explain the approach used for efficiency.

@jtengyp
Copy link

jtengyp commented Apr 28, 2017

I did some tests with the PR.
Here is the cluster configure:
3 workers, each has 10 cores and 30G memory.
With the netflix dataset (480,189 users and 17770 movies), the recommendProductsForUsers time reduces from 488.36s to 60.93s, 8x faster than the original method.

With a larger dataset (3.29million users and 0.21 million products), the recommendProductsForUsers time reduces from 48h to 39min, 73x faster than the original method.

@SparkQA
Copy link

SparkQA commented Apr 28, 2017

Test build #76254 has finished for PR 17742 at commit 206a023.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val output = new Array[(Int, (Int, Double))](m * n)
var j = 0
srcIter.foreach { case (srcId, srcFactor) =>
val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: there are several 4-space indents here that should be 2

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @srowen

@SparkQA
Copy link

SparkQA commented Apr 28, 2017

Test build #76258 has finished for PR 17742 at commit 8eab55b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 28, 2017

Test build #76260 has started for PR 17742 at commit a5261b7.

@MLnick
Copy link
Contributor

MLnick commented May 9, 2017

I changed the commit message to drop the [ML] by the way

asfgit pushed a commit that referenced this pull request May 9, 2017
This PR is a `DataFrame` version of #17742 for [SPARK-11968](https://issues.apache.org/jira/browse/SPARK-11968), for improving the performance of `recommendAll` methods.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes #17845 from MLnick/ml-als-perf.

(cherry picked from commit 10b00ab)
Signed-off-by: Nick Pentreath <[email protected]>
ghost pushed a commit to dbtsai/spark that referenced this pull request May 9, 2017
This PR is a `DataFrame` version of apache#17742 for [SPARK-11968](https://issues.apache.org/jira/browse/SPARK-11968), for improving the performance of `recommendAll` methods.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes apache#17845 from MLnick/ml-als-perf.
@mengxr
Copy link
Contributor

mengxr commented May 10, 2017

I think the problem is not BLAS-3 ops, nor the 256MB total memory. The val output = new Array[(Int, (Int, Double))](m * n) is not specialized. Each element holds two references. If m=4096 and n=4096, in total we have 33.5 million objects, which caused GC. The implementation in this PR changed n to k, which significantly reduced the total number of temp objects. But it doesn't mean that we should drop BLAS-3.

@mpjlu Could you test the following?

  • change block size to 2048, which reduced the max possible tmp objects
  • After val ratings = srcFactors.transpose.multiply(dstFactors), do not construct output. There are two options:
    ** The most optimized version would be doing a quickselect on each row and select top k.
    ** An easy-to-implement version would be:
Iterator.range(0, m).flatMap { i => 
  Iterator.range(0, n).map { j =>
    (srcIds(i), (dstIds(j), ratings(i, j)))
 }
}

The second option is just a quick test, scarifying some performance. The temp objects created this way have very short life, and GC should be able to handle it. Then very likely we don't need to do top-k inside ALS, because the topByKey implementation is doing the same: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42.

@mpjlu
Copy link
Author

mpjlu commented May 10, 2017

Thanks @mengxr
I have tested different blockSize, see https://issues.apache.org/jira/browse/SPARK-20443
I will test the other methods you mentioned.

@mpjlu
Copy link
Author

mpjlu commented May 10, 2017

I don't think we should use BLAS 3 here, because no matter use output or not here, you need a big buffer to save the BLAS result. That still cause GC problem.
I also want to test: build a handwritten native gemm, which not returns C (C=A*B), but only return the topK elements of each row. This maybe better performance than current solution.

@mengxr
Copy link
Contributor

mengxr commented May 10, 2017

A single buffer doesn't lead to long GC pause. If it request lot of memory, it might trigger GC to collect other objects. But itself is a single object, which can be easily GC'ed. The problem here is having many small long-living objects as in output.

@mpjlu
Copy link
Author

mpjlu commented May 10, 2017

Thanks, I will do some test based on BLAS 3.

@mpjlu
Copy link
Author

mpjlu commented May 10, 2017

** The most optimized version would be doing a quickselect on each row and select top k.
** An easy-to-implement version would be:
I test both of the methods, the best performance is about 50% this PR (this PR is about 2x of these two methods). Native BLAS is MKL.
Welcome to do more test.

@MLnick
Copy link
Contributor

MLnick commented May 10, 2017

BLAS3 with still keeping the output size as n x m rather than n x k results in massively more shuffle data - I don't think any solution based on exploding the intermediate data so much can be as efficient as this. Since for k=10 it's ~80k little objects per block vs ~33 million...

I had a version using BLAS 3 followed by a sort per row (see https://issues.apache.org/jira/browse/SPARK-11968 for branch link and test details). For MLLIB it was slower than this approach by a factor of 1.5x. I just re-tested for ML and it it is 56s vs 16s for this approach, so really significantly slower.

Comparatively, both approaches created the intermediate output objects (but only n x k size). Certainly that part could perhaps be further optimized. However, the BLAS3 approach still had around 20% GC time vs around 12% from this approach. Each gemm does indeed require a large intermediate array and this seems to cause additional GC time (whether directly or indirectly).

Even without that this approach is a lot faster than gemm and sort for the top-k by row. I'm sure the per-row top-k can be made a lot more efficient and that is worth exploring (though frankly I am doubtful it will result in that much more gain over this approach, relative to the code complexity it will introduce). The small object GC can perhaps be improved with the iterator approach and avoiding creating the output array (that may be good for another 5% or so perhaps?) - this applies to whatever approach is used.

@MLnick
Copy link
Contributor

MLnick commented May 10, 2017

It's true I think my native BLAS is not working will have to check - but yeah 1.5-2x matches what I've seen in my comparisons

@mengxr
Copy link
Contributor

mengxr commented May 10, 2017

@mpjlu Could you try not linking with native BLAS or system BLAS in your test? Just let it fallback to f2j BLAS. I can do some tests on my end too.

@mpjlu
Copy link
Author

mpjlu commented May 11, 2017

F2Jblas is faster than MKL blas. The following test is based on F2jBLAS.
Method 1: BLAS 3 + quickselect on each row and select top k.
Method 2: this PR
BLOCK size: 256 512 1024 2048 4096 8192
Method 1: 48s 41s 36s 88s NA NA
Method 2: NA NA 34s 35s 34s 34s

NA means not test that case.

3 workers: each worker 40 cores, each worker 120G memory, each worker 1 executor.
The Data: User 480,000, and Item 17,000

@MLnick
Copy link
Contributor

MLnick commented May 11, 2017

@mpjlu do you have link to the code for Method 1?

@mpjlu
Copy link
Author

mpjlu commented May 11, 2017

val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val pq = new BoundedPriorityQueue(Int, Double)(Ordering.by(_._2))
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * num)
var i = 0
var j = 0
while (i < ratings.numRows) {
var k = 0
while (k < ratings.numCols) {
pq += k -> ratings(i, k)
k += 1
}
pq.foreach { case (dstId, score) =>
output(j) = (srcIds(i), (dstId, score))
j += 1
}
i += 1
pq.clear
}
output.toSeq
}

val userRec = ratings.topByKey(num)(Ordering.by(_._2))

Code is like this. Thanks.

@mpjlu
Copy link
Author

mpjlu commented May 11, 2017

I not validate whether this code is right. just test performance.

asfgit pushed a commit that referenced this pull request May 16, 2017
Small clean ups from #17742 and #17845.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes #17919 from MLnick/SPARK-20677-als-perf-followup.

(cherry picked from commit 25b4f41)
Signed-off-by: Nick Pentreath <[email protected]>
ghost pushed a commit to dbtsai/spark that referenced this pull request May 16, 2017
Small clean ups from apache#17742 and apache#17845.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes apache#17919 from MLnick/SPARK-20677-als-perf-followup.
robert3005 pushed a commit to palantir/spark that referenced this pull request May 19, 2017
Small clean ups from apache#17742 and apache#17845.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes apache#17919 from MLnick/SPARK-20677-als-perf-followup.
liyichao pushed a commit to liyichao/spark that referenced this pull request May 24, 2017
The recommendForAll of MLLIB ALS is very slow.
GC is a key problem of the current method.
The task use the following code to keep temp result:
val output = new Array[(Int, (Int, Double))](m*n)
m = n = 4096 (default value, no method to set)
so output is about 4k * 4k * (4 + 4 + 8) = 256M. This is a large memory and cause serious GC problem, and it is frequently OOM.

Actually, we don't need to save all the temp result. Support we recommend topK (topK is about 10, or 20) product for each user, we only need 4k * topK * (4 + 4 + 8) memory to save the temp result.

The Test Environment:
3 workers: each work 10 core, each work 30G memory, each work 1 executor.
The Data: User 480,000, and Item 17,000

BlockSize:     1024  2048  4096  8192
Old method:  245s  332s  488s  OOM
This solution: 121s  118s   117s  120s

The existing UT.

Author: Peng <[email protected]>
Author: Peng Meng <[email protected]>

Closes apache#17742 from mpjlu/OptimizeAls.
liyichao pushed a commit to liyichao/spark that referenced this pull request May 24, 2017
This PR is a `DataFrame` version of apache#17742 for [SPARK-11968](https://issues.apache.org/jira/browse/SPARK-11968), for improving the performance of `recommendAll` methods.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes apache#17845 from MLnick/ml-als-perf.
liyichao pushed a commit to liyichao/spark that referenced this pull request May 24, 2017
Small clean ups from apache#17742 and apache#17845.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes apache#17919 from MLnick/SPARK-20677-als-perf-followup.
@mpjlu
Copy link
Author

mpjlu commented Jul 4, 2017

I find why F2j BLAS is much faster than Native BLAS for xiangrui's method (use GEMM) here.
https://issues.apache.org/jira/browse/SPARK-21305

@mpjlu
Copy link
Author

mpjlu commented Jul 12, 2017

I have rewritten recommendForAll with BLAS GEMM, and get about 50% performance improvement.
https://issues.apache.org/jira/browse/SPARK-21389

@mpjlu
Copy link
Author

mpjlu commented Jul 14, 2017

I have submitted PR for ALS optimization with GEMM. and it is ready for review.
The performance is about 50% improvement comparing with the master method.
#18624

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants