From 0ce8c644e3cbed3cf600fcb30526e46a8054e498 Mon Sep 17 00:00:00 2001 From: Anthony Truchet Date: Wed, 30 Nov 2016 12:21:47 +0100 Subject: [PATCH 1/4] [SPARK-18471][MLLIB] In LBFGS, avoid sending huge vectors of 0 CostFun used to send a dense vector of zeroes as a closure in a treeAggregate call. To avoid that, we ensure the zero is sent as a SparseVector but that it is converted into a DensseVector immediately in seqOp, thus effectively generating a (potentially huge) vector of zero on the executors only. --- .../spark/mllib/optimization/LBFGS.scala | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index 900eec18489c..e0fbb3d289d8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -241,16 +241,25 @@ object LBFGS extends Logging { val bcW = data.context.broadcast(w) val localGradient = gradient - val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))( - seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => - val l = localGradient.compute( - features, label, bcW.value, grad) - (grad, loss + l) - }, - combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => + // Given (current accumulated gradient, current loss) and (label, features) + // tuples, updates the current gradient and current loss + val seqOp = (c: (Vector, Double), v: (Double, Vector)) => + (c, v) match { + case ((grad, loss), (label, features)) => + val denseGrad = grad.toDense + val l = localGradient.compute(features, label, bcW.value, denseGrad) + (denseGrad, loss + l) + } + + // Adds two (gradient, loss) tuples + val combOp = (c1: (Vector, Double), c2: (Vector, Double)) => + (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => axpy(1.0, grad2, grad1) (grad1, loss1 + loss2) - }) + } + + val zeroSparseVector = Vectors.sparse(n, Seq()) + val (gradientSum, lossSum) = data.treeAggregate(zeroSparseVector, 0.0)(seqOp, combOp) /** * regVal is sum of weight squares if it's L2 updater; From 9b30c5c8a01f7d3408eae9c27e2e3b52b216a0b2 Mon Sep 17 00:00:00 2001 From: Anthony Truchet Date: Thu, 1 Dec 2016 11:32:07 +0100 Subject: [PATCH 2/4] Fix error triggered in PySpark --- .../scala/org/apache/spark/mllib/optimization/LBFGS.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index e0fbb3d289d8..8abb88da7441 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -254,8 +254,10 @@ object LBFGS extends Logging { // Adds two (gradient, loss) tuples val combOp = (c1: (Vector, Double), c2: (Vector, Double)) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => - axpy(1.0, grad2, grad1) - (grad1, loss1 + loss2) + val denseGrad1 = grad1.toDense + val denseGrad2 = grad2.toDense + axpy(1.0, denseGrad2, denseGrad1) + (denseGrad1, loss1 + loss2) } val zeroSparseVector = Vectors.sparse(n, Seq()) From 3b59ab283797bb8448b2b4384caa0fb12ae9fece Mon Sep 17 00:00:00 2001 From: Anthony Truchet Date: Tue, 6 Dec 2016 10:27:10 +0100 Subject: [PATCH 3/4] Stylistic review taken into account --- .../scala/org/apache/spark/mllib/optimization/LBFGS.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index 8abb88da7441..a163d72d5d35 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -241,8 +241,6 @@ object LBFGS extends Logging { val bcW = data.context.broadcast(w) val localGradient = gradient - // Given (current accumulated gradient, current loss) and (label, features) - // tuples, updates the current gradient and current loss val seqOp = (c: (Vector, Double), v: (Double, Vector)) => (c, v) match { case ((grad, loss), (label, features)) => @@ -251,7 +249,6 @@ object LBFGS extends Logging { (denseGrad, loss + l) } - // Adds two (gradient, loss) tuples val combOp = (c1: (Vector, Double), c2: (Vector, Double)) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => val denseGrad1 = grad1.toDense @@ -261,7 +258,7 @@ object LBFGS extends Logging { } val zeroSparseVector = Vectors.sparse(n, Seq()) - val (gradientSum, lossSum) = data.treeAggregate(zeroSparseVector, 0.0)(seqOp, combOp) + val (gradientSum, lossSum) = data.treeAggregate((zeroSparseVector, 0.0))(seqOp, combOp) /** * regVal is sum of weight squares if it's L2 updater; From d87fc464f24ec9194233eff8035b55474cd4b150 Mon Sep 17 00:00:00 2001 From: sethah Date: Mon, 12 Dec 2016 22:50:35 +0800 Subject: [PATCH 4/4] add test --- .../spark/mllib/optimization/LBFGSSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala index 75ae0eb32fb7..572959200f47 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala @@ -230,6 +230,25 @@ class LBFGSSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers (weightLBFGS(0) ~= weightGD(0) relTol 0.02) && (weightLBFGS(1) ~= weightGD(1) relTol 0.02), "The weight differences between LBFGS and GD should be within 2%.") } + + test("SPARK-18471: LBFGS aggregator on empty partitions") { + val regParam = 0 + + val initialWeightsWithIntercept = Vectors.dense(0.0) + val convergenceTol = 1e-12 + val numIterations = 1 + val dataWithEmptyPartitions = sc.parallelize(Seq((1.0, Vectors.dense(2.0))), 2) + + LBFGS.runLBFGS( + dataWithEmptyPartitions, + gradient, + simpleUpdater, + numCorrections, + convergenceTol, + numIterations, + regParam, + initialWeightsWithIntercept) + } } class LBFGSClusterSuite extends SparkFunSuite with LocalClusterSparkContext {