Skip to content

Conversation

@AnthonyTruchet
Copy link

What changes were proposed in this pull request?

CostFun used to send a dense vector of zeroes as a closure in a
treeAggregate call. To avoid that, we replace treeAggregate by
mapPartition + treeReduce, creating a zero vector inside the mapPartition
block in-place.

How was this patch tested?

Unit test for module mllib run locally for correctness.

As for performance we run an heavy optimization on our production data (50 iterations on 128 MB weight vectors) and have seen significant decrease in terms both of runtime and container being killed by lack of off-heap memory.

@AnthonyTruchet
Copy link
Author

See #15963 for first feed back on those changes.

@srowen
Copy link
Member

srowen commented Nov 28, 2016

OK, this is the fourth pull request though (not counting a not-quite-related 5th). You don't need to open a new PR to push more changes and it adds to the difficulty in reviewing. This still doesn't incorporate suggestions from the last review.

@srowen
Copy link
Member

srowen commented Nov 30, 2016

Following #16038 I suggest this proceed by making the zero value a sparse vector, and then making it dense in the seqOp immediately.

@MLnick
Copy link
Contributor

MLnick commented Nov 30, 2016

This is all a bit confusing - can we highlight which PR is actually to be reviewed?

@srowen
Copy link
Member

srowen commented Nov 30, 2016

This should be the main PR @MLnick

@MLnick
Copy link
Contributor

MLnick commented Nov 30, 2016

Right ok. So I think the approach of making the zero vector sparse then calling toDense in seqOp as @srowen suggested makes most sense.

Currently the gradient vector must be dense in MLlib since both axpy and the logic for multinomial logreg requires it. So the thing that is initially serialized with the task should be tiny, and the call toDense for the first instance in each partition will essentially generate the dense zero vector. Thereafter it should be a no-op as the vector will be dense and toDense will just be a ref to the values array.

Can we see if this works:

      val zeroVector = Vectors.sparse(n, Seq())
      val (gradientSum, lossSum) = data.treeAggregate((zeroVector, 0.0))(
          seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
            val denseGrad = grad.toDense
            val l = localGradient.compute(
              features, label, bcW.value, denseGrad)
            (denseGrad, loss + l)
          },
          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
            axpy(1.0, grad2, grad1)
            (grad1, loss1 + loss2)
          })

@MLnick
Copy link
Contributor

MLnick commented Nov 30, 2016

What worries me more actually is that the initial vector when sent in the closure should be compressed. So why is this issue occurring? Is it a problem with serialization / compression? OR even after compression it is still too large? Would be good to understand that.

@AnthonyTruchet
Copy link
Author

Hello @MLnick,

Sorry that my push of a new version just crossed with your suggestion.

I'll test you suggestion. Thanks to for having written it, I didn't get that Sean's hint was this.

@AnthonyTruchet
Copy link
Author

AnthonyTruchet commented Nov 30, 2016

@MLnick I have to add a transtyping denseGrad in the expression returned by seqOp from DenseVector to Vector. The only way I could find to do it with the API are: Vectors.fromBreeze(denseGrad.asBreeze) or Vectors.dense(denseGrad.values): I'm a bit concerned with the potential recopy of a 128MB piece of memory... Anyway, here is the update, benchmark in progress.

@AnthonyTruchet AnthonyTruchet force-pushed the ENG-17719-lbfgs-only branch 3 times, most recently from 51b8dd5 to d7ebc7d Compare November 30, 2016 13:26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can compile with my example code...

@MLnick
Copy link
Contributor

MLnick commented Dec 1, 2016

ok to test

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69463 has finished for PR 16037 at commit d7ebc7d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

CostFun used to send a dense vector of zeroes as a closure in a
treeAggregate call. To avoid that, we ensure the zero is sent as a
SparseVector but that it is converted into a DensseVector immediately in
seqOp, thus effectively generating a (potentially huge) vector of zero on
the executors only.
@AnthonyTruchet
Copy link
Author

Scala style checks fixed, and spurious cast removed.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this LGTM. Let's make sure it passes tests. CC maybe @dbtsai FYI

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69466 has finished for PR 16037 at commit 0ce8c64.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AnthonyTruchet
Copy link
Author

This seems very related to the change. I know nothing wrt the way PySpark interfaces Python and Scala and I'm surprised that changing an internal f the Scala lib causes this ? Ready to learn and help though :-)

File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/mllib/classification.py", line 155, in __main__.LogisticRegressionModel
Failed example:
    mcm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3)
Exception raised:
    Traceback (most recent call last):
      File "/usr/lib64/python2.6/doctest.py", line 1253, in __run
        compileflags, 1) in test.globs
      File "<doctest __main__.LogisticRegressionModel[23]>", line 1, in <module>
        mcm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3)
      [...]
    Py4JJavaError: An error occurred while calling o173.trainLogisticRegressionModelWithLBFGS.
     [...]
Caused by: java.lang.IllegalArgumentException: axpy only supports adding to a dense vector but got type class org.apache.spark.mllib.linalg.SparseVector.
    	at org.apache.spark.mllib.linalg.BLAS$.axpy(BLAS.scala:58)
    	at org.apache.spark.mllib.optimization.LBFGS$CostFun$$anonfun$2.apply(LBFGS.scala:257)
    	at org.apache.spark.mllib.optimization.LBFGS$CostFun$$anonfun$2.apply(LBFGS.scala:255)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think grad1 here will need to be denseGrad1?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks for spotting.

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69469 has finished for PR 16037 at commit ec183a2.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69470 has finished for PR 16037 at commit 9b30c5c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MLnick
Copy link
Contributor

MLnick commented Dec 2, 2016

I'm sure this will be net positive, and shouldn't cause any regression. Still, we must be certain. @AnthonyTruchet can you provide for posterity the detailed test results for the vector sizes you mentioned? And perhaps also some results for some smaller sizes (since I imagine the benefit of this change for that scenario is quite small and we should just check there is no unexpected overhead or regression we've somehow missed from the toDense calls though I can't see that happening).

@AnthonyTruchet
Copy link
Author

L-BFGS is the only optimizer I've used so far. I'm not sure how much time I can free to take care of the other ones, but I'll try :-)

Regarding the bench, I'll check if we have archived the results, otherwise I'll relaunch it next week

@srowen
Copy link
Member

srowen commented Dec 3, 2016

Yes I'm pretty OK with merging this. If you can dig up any results, that's all the better. Will check in with you next week.

}

val zeroSparseVector = Vectors.sparse(n, Seq())
val (gradientSum, lossSum) = data.treeAggregate(zeroSparseVector, 0.0)(seqOp, combOp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly in favor of keeping the parentheses for the zero values. If you don't know the signature of treeAggregate and that the scala compiler evidently packs these into a tuple, you may be confused about what the second argument is. Not a strong preference.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean data.treeAggregate((zeroSparseVector, 0.0)), don't you ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes :)

// Adds two (gradient, loss) tuples
val combOp = (c1: (Vector, Double), c2: (Vector, Double)) =>
(c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
val denseGrad1 = grad1.toDense
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary because we may have empty partitions, right? Might be nice to add a test explicitly for this (it seems it failed in pyspark without it, but that was just a coincidence?) so someone doesn't remove these lines in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning, when would the args ever not be dense? I agree, shouldn't be sparse at this stage, but doing this defensively seems fine since it's a no-op for dense.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. actually missing the handling of dense vector was the cause for the PySpark UTest failure we observed.

(denseGrad, loss + l)
}

// Adds two (gradient, loss) tuples
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this comment is necessary. The semantics of a combine op should be well understood.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@sethah
Copy link
Contributor

sethah commented Dec 4, 2016

@MLnick Yeah, this is likely a problem with all the ML aggregators as well. We can probably take care of it using lazy evaluation.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @sethat's two comments regarding the comment and the arg to treeAggregate (that generates a warning I think). Then I think it's good to go.

// Adds two (gradient, loss) tuples
val combOp = (c1: (Vector, Double), c2: (Vector, Double)) =>
(c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
val denseGrad1 = grad1.toDense
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning, when would the args ever not be dense? I agree, shouldn't be sparse at this stage, but doing this defensively seems fine since it's a no-op for dense.

@AnthonyTruchet
Copy link
Author

@sethah Agreed, that's why I uggested to add a dedicated treeAggregate wrapper to MLlin Utils which would take care of that without fiddling with sparsity for each seqOP and comboOp. See closed #16078 for the idea...

@SparkQA
Copy link

SparkQA commented Dec 6, 2016

Test build #69722 has finished for PR 16037 at commit 3b59ab2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


val combOp = (c1: (Vector, Double), c2: (Vector, Double)) =>
(c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
val denseGrad1 = grad1.toDense
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still pretty strongly in favor of adding a test case explicitly for this. Just make an RDD with at least one empty partition, and be sure that LBFGS will run on it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I have no clue how to generate a non-empty RDD with an empty partition. Can you please hint me at some entry points so that I can contribute the UTest you request ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about just parallelizing n elements into more than n partitions? at least one partition must be empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AnthonyTruchet does that let you add a quick test case? then I think this can be merged.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to work on this in the next two days. I will not be able to relaunch the actual benchmark we did weeks ago, our internal codebase has changed too much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to submit a PR to your branch, but combination of sketchy wifi and git - not sure it worked.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did thanks and I merged it. I was just comming back to this contrib when I saw it.

@SparkQA
Copy link

SparkQA commented Dec 13, 2016

Test build #70081 has finished for PR 16037 at commit 18fcbba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sethah
Copy link
Contributor

sethah commented Dec 13, 2016

LGTM

@srowen
Copy link
Member

srowen commented Dec 13, 2016

Merged to master

@asfgit asfgit closed this in 9e8a9d7 Dec 13, 2016
@AnthonyTruchet
Copy link
Author

Thanks for keeping up with this merge request, I've learned a lot wrt the contribution process and good practice, and next contrib will hopefully be much more straightforward. Thanks to the Spark commiter team for this great piece of software :-)

@dbtsai
Copy link
Member

dbtsai commented Dec 15, 2016

Sorry for late review. Just come back to US. LGTM too! Thanks.

robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
## What changes were proposed in this pull request?

CostFun used to send a dense vector of zeroes as a closure in a
treeAggregate call. To avoid that, we replace treeAggregate by
mapPartition + treeReduce, creating a zero vector inside the mapPartition
block in-place.

## How was this patch tested?

Unit test for module mllib run locally for correctness.

As for performance we run an heavy optimization on our production data (50 iterations on 128 MB weight vectors) and have seen significant decrease in terms both of runtime and container being killed by lack of off-heap memory.

Author: Anthony Truchet <[email protected]>
Author: sethah <[email protected]>
Author: Anthony Truchet <[email protected]>

Closes apache#16037 from AnthonyTruchet/ENG-17719-lbfgs-only.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

CostFun used to send a dense vector of zeroes as a closure in a
treeAggregate call. To avoid that, we replace treeAggregate by
mapPartition + treeReduce, creating a zero vector inside the mapPartition
block in-place.

## How was this patch tested?

Unit test for module mllib run locally for correctness.

As for performance we run an heavy optimization on our production data (50 iterations on 128 MB weight vectors) and have seen significant decrease in terms both of runtime and container being killed by lack of off-heap memory.

Author: Anthony Truchet <[email protected]>
Author: sethah <[email protected]>
Author: Anthony Truchet <[email protected]>

Closes apache#16037 from AnthonyTruchet/ENG-17719-lbfgs-only.
asfgit pushed a commit that referenced this pull request Mar 3, 2017
## What changes were proposed in this pull request?

JIRA: [SPARK-19745](https://issues.apache.org/jira/browse/SPARK-19745)

Reorganize SVCAggregator to avoid serializing coefficients. This patch also makes the gradient array a `lazy val` which will avoid materializing a large array on the driver before shipping the class to the executors. This improvement stems from #16037. Actually, probably all ML aggregators can benefit from this.

We can either: a.) separate the gradient improvement into another patch b.) keep what's here _plus_ add the lazy evaluation to all other aggregators in this patch or c.) keep it as is.

## How was this patch tested?

This is an interesting question! I don't know of a reasonable way to test this right now. Ideally, we could perform an optimization and look at the shuffle write data for each task, and we could compare the size to what it we know it should be: `numCoefficients * 8 bytes`. Not sure if there is a good way to do that right now? We could discuss this here or in another JIRA, but I suspect it would be a significant undertaking.

Author: sethah <[email protected]>

Closes #17076 from sethah/svc_agg.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants