Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,16 +241,24 @@ object LBFGS extends Logging {
val bcW = data.context.broadcast(w)
val localGradient = gradient

val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = localGradient.compute(
features, label, bcW.value, grad)
(grad, loss + l)
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
axpy(1.0, grad2, grad1)
(grad1, loss1 + loss2)
})
val seqOp = (c: (Vector, Double), v: (Double, Vector)) =>
(c, v) match {
case ((grad, loss), (label, features)) =>
val denseGrad = grad.toDense
val l = localGradient.compute(features, label, bcW.value, denseGrad)
(denseGrad, loss + l)
}

val combOp = (c1: (Vector, Double), c2: (Vector, Double)) =>
(c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
val denseGrad1 = grad1.toDense
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary because we may have empty partitions, right? Might be nice to add a test explicitly for this (it seems it failed in pyspark without it, but that was just a coincidence?) so someone doesn't remove these lines in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning, when would the args ever not be dense? I agree, shouldn't be sparse at this stage, but doing this defensively seems fine since it's a no-op for dense.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. actually missing the handling of dense vector was the cause for the PySpark UTest failure we observed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still pretty strongly in favor of adding a test case explicitly for this. Just make an RDD with at least one empty partition, and be sure that LBFGS will run on it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I have no clue how to generate a non-empty RDD with an empty partition. Can you please hint me at some entry points so that I can contribute the UTest you request ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about just parallelizing n elements into more than n partitions? at least one partition must be empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AnthonyTruchet does that let you add a quick test case? then I think this can be merged.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to work on this in the next two days. I will not be able to relaunch the actual benchmark we did weeks ago, our internal codebase has changed too much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to submit a PR to your branch, but combination of sketchy wifi and git - not sure it worked.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did thanks and I merged it. I was just comming back to this contrib when I saw it.

val denseGrad2 = grad2.toDense
axpy(1.0, denseGrad2, denseGrad1)
(denseGrad1, loss1 + loss2)
}

val zeroSparseVector = Vectors.sparse(n, Seq())
val (gradientSum, lossSum) = data.treeAggregate((zeroSparseVector, 0.0))(seqOp, combOp)

/**
* regVal is sum of weight squares if it's L2 updater;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,25 @@ class LBFGSSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers
(weightLBFGS(0) ~= weightGD(0) relTol 0.02) && (weightLBFGS(1) ~= weightGD(1) relTol 0.02),
"The weight differences between LBFGS and GD should be within 2%.")
}

test("SPARK-18471: LBFGS aggregator on empty partitions") {
val regParam = 0

val initialWeightsWithIntercept = Vectors.dense(0.0)
val convergenceTol = 1e-12
val numIterations = 1
val dataWithEmptyPartitions = sc.parallelize(Seq((1.0, Vectors.dense(2.0))), 2)

LBFGS.runLBFGS(
dataWithEmptyPartitions,
gradient,
simpleUpdater,
numCorrections,
convergenceTol,
numIterations,
regParam,
initialWeightsWithIntercept)
}
}

class LBFGSClusterSuite extends SparkFunSuite with LocalClusterSparkContext {
Expand Down