From e1db950e91c7d9526519626aa252cd711307d857 Mon Sep 17 00:00:00 2001 From: Li Pu Date: Tue, 3 Jun 2014 18:05:18 -0700 Subject: [PATCH 01/17] SPARK-1782: svd for sparse matrix using ARPACK copy ARPACK dsaupd/dseupd code from latest breeze change RowMatrix to use sparse SVD change tests for sparse SVD --- .../linalg/EigenValueDecomposition.scala | 120 ++++++++++++++++++ .../mllib/linalg/distributed/RowMatrix.scala | 42 ++++-- .../linalg/distributed/RowMatrixSuite.scala | 10 +- 3 files changed, 153 insertions(+), 19 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala new file mode 100644 index 0000000000000..7f07eb8768e97 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.annotation.Experimental +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} +import org.netlib.util.{intW, doubleW} +import com.github.fommil.netlib.ARPACK + +/** + * :: Experimental :: + * Represents eigenvalue decomposition factors. + */ +@Experimental +case class EigenValueDecomposition[VType](s: Vector, V: VType) + +object EigenValueDecomposition { + /** + * Compute the leading k eigenvalues and eigenvectors on a symmetric square matrix using ARPACK. + * The caller needs to ensure that the input matrix is real symmetric. This function requires + * memory for `n*(4*k+4)` doubles. + * + * @param mul a function that multiplies the symmetric matrix with a Vector. + * @param n dimension of the square matrix (maximum Int.MaxValue). + * @param k number of leading eigenvalues required. + * @param tol tolerance of the eigs computation. + * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors + * (columns of the matrix). The number of computed eigenvalues might be smaller than k. + */ + private[mllib] def symmetricEigs(mul: Vector => Vector, n: Int, k: Int, tol: Double) + : (BDV[Double], BDM[Double]) = { + require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n") + + val arpack = ARPACK.getInstance() + + val tolW = new doubleW(tol) + val nev = new intW(k) + val ncv = scala.math.min(2*k,n) + + val bmat = "I" + val which = "LM" + + var iparam = new Array[Int](11) + iparam(0) = 1 + iparam(2) = 300 + iparam(6) = 1 + + var ido = new intW(0) + var info = new intW(0) + var resid:Array[Double] = new Array[Double](n) + var v = new Array[Double](n*ncv) + var workd = new Array[Double](3*n) + var workl = new Array[Double](ncv*(ncv+8)) + var ipntr = new Array[Int](11) + + // first call to ARPACK + arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, + workl, workl.length, info) + + val w = BDV(workd) + + while(ido.`val` !=99) { + if (ido.`val` != -1 && ido.`val` != 1) + throw new IllegalStateException("ARPACK returns ido = " + ido.`val`) + // multiply working vector with the matrix + val inputOffset = ipntr(0) - 1 + val outputOffset = ipntr(1) - 1 + val x = w(inputOffset until inputOffset + n) + val y = w(outputOffset until outputOffset + n) + y := BDV(mul(Vectors.fromBreeze(x)).toArray) + // call ARPACK + arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, + workd, workl, workl.length, info) + } + + if (info.`val` != 0) + throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val`) + + val d = new Array[Double](nev.`val`) + val select = new Array[Boolean](ncv) + val z = java.util.Arrays.copyOfRange(v, 0, nev.`val` * n) + + arpack.dseupd(true, "A", select, d, z, n, 0.0, bmat, n, which, nev, tol, resid, ncv, v, n, + iparam, ipntr, workd, workl, workl.length, info) + + val computed = iparam(4) + + val s = BDV(d)(0 until computed) + val U = new BDM(n, computed, z) + + val sortedEigenValuesWithIndex = s.toArray.zipWithIndex.sortBy(-1 * _._1).zipWithIndex + + val sorteds = BDV(sortedEigenValuesWithIndex.map(_._1._1)) + val sortedU = BDM.zeros[Double](n, computed) + + // copy eigenvectors in descending order of eigenvalues + sortedEigenValuesWithIndex.map{ + r => { + sortedU(::, r._2) := U(::, r._1._2) + } + } + + (sorteds, sortedU) + } +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 07dfadf2f7869..fd653a367a81d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -200,6 +200,19 @@ class RowMatrix( nRows } + /** + * Multiply the Gramian matrix `A^T A` by a Vector on the right. + * + * @param v a local vector whose length must match the number of columns of this matrix + * @return a local DenseVector representing the product + */ + private[mllib] def multiplyGramianMatrix(v: Vector): Vector = { + val bv = rows.map{ + row => row.toBreeze * row.toBreeze.dot(v.toBreeze) + }.reduce( (x: BV[Double], y: BV[Double]) => x + y ) + Vectors.fromBreeze(bv) + } + /** * Computes the Gramian matrix `A^T A`. */ @@ -221,15 +234,17 @@ class RowMatrix( /** * Computes the singular value decomposition of this matrix. - * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'. + * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A ~= U * S * V', + * where S contains the leading singular values, U and V contain the corresponding singular + * vectors. * - * There is no restriction on m, but we require `n^2` doubles to fit in memory. - * Further, n should be less than m. - - * The decomposition is computed by first computing A'A = V S^2 V', - * computing svd locally on that (since n x n is small), from which we recover S and V. - * Then we compute U via easy matrix multiplication as U = A * (V * S^-1). - * Note that this approach requires `O(n^3)` time on the master node. + * There is no restriction on m, but we require `n*(6*k+4)` doubles to fit in memory on the master + * node. Further, n should be less than m. + * + * The decomposition is computed by providing a function that multiples a vector with A'A to + * ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V. + * Then we compute U via easy matrix multiplication as U = A * (V * S-1). + * Note that this approach requires `O(nnz(A))` time. * * At most k largest non-zero singular values and associated vectors are returned. * If there are k such values, then the dimensions of the return will be: @@ -243,20 +258,19 @@ class RowMatrix( * @param computeU whether to compute U * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) * are treated as zero, where sigma(0) is the largest singular value. + * @param tol the tolerance of the svd computation. * @return SingularValueDecomposition(U, s, V) */ def computeSVD( k: Int, computeU: Boolean = false, - rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { + rCond: Double = 1e-9, + tol: Double = 1e-6): SingularValueDecomposition[RowMatrix, Matrix] = { val n = numCols().toInt require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") - val G = computeGramianMatrix() - - // TODO: Use sparse SVD instead. - val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) = - brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) + val (sigmaSquares: BDV[Double], u: BDM[Double]) = + EigenValueDecomposition.symmetricEigs(multiplyGramianMatrix, n, k, tol) val sigmas: BDV[Double] = brzSqrt(sigmaSquares) // Determine effective rank. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index c9f9acf4c1335..9c346de3d2fbe 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -99,7 +99,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { val localMat = mat.toBreeze() val (localU, localSigma, localVt) = brzSvd(localMat) val localV: BDM[Double] = localVt.t.toDenseMatrix - for (k <- 1 to n) { + for (k <- 1 to (n - 1)) { val svd = mat.computeSVD(k, computeU = true) val U = svd.U val s = svd.s @@ -113,19 +113,19 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) } - val svdWithoutU = mat.computeSVD(n) + val svdWithoutU = mat.computeSVD(n - 1) assert(svdWithoutU.U === null) } } test("svd of a low-rank matrix") { - val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2) - val mat = new RowMatrix(rows, 4, 2) + val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) + val mat = new RowMatrix(rows, 4, 3) val svd = mat.computeSVD(2, computeU = true) assert(svd.s.size === 1, "should not return zero singular values") assert(svd.U.numRows() === 4) assert(svd.U.numCols() === 1) - assert(svd.V.numRows === 2) + assert(svd.V.numRows === 3) assert(svd.V.numCols === 1) } From 96d2ecb837843651db70d7505ddb73cfc0b0bf9a Mon Sep 17 00:00:00 2001 From: Li Pu Date: Tue, 3 Jun 2014 23:03:35 -0700 Subject: [PATCH 02/17] improve eigenvalue sorting --- .../linalg/EigenValueDecomposition.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index 7f07eb8768e97..fc283cbe1472d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -100,21 +100,22 @@ object EigenValueDecomposition { val computed = iparam(4) - val s = BDV(d)(0 until computed) - val U = new BDM(n, computed, z) - - val sortedEigenValuesWithIndex = s.toArray.zipWithIndex.sortBy(-1 * _._1).zipWithIndex + val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map{ + r => (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n)) + } - val sorteds = BDV(sortedEigenValuesWithIndex.map(_._1._1)) - val sortedU = BDM.zeros[Double](n, computed) + val sortedEigenPairs = eigenPairs.sortBy(-1 * _._1) // copy eigenvectors in descending order of eigenvalues - sortedEigenValuesWithIndex.map{ + val sortedU = BDM.zeros[Double](n, computed) + sortedEigenPairs.zipWithIndex.map{ r => { - sortedU(::, r._2) := U(::, r._1._2) + for (i <- 0 until n) { + sortedU.data(r._2 * n + i) = r._1._2(i) + } } } - (sorteds, sortedU) + (BDV(sortedEigenPairs.map(_._1)), sortedU) } } \ No newline at end of file From fe983b0e7d62359275a92c2adaae8a635d7dd5d8 Mon Sep 17 00:00:00 2001 From: Li Pu Date: Wed, 4 Jun 2014 00:01:29 -0700 Subject: [PATCH 03/17] improve scala style --- .../linalg/EigenValueDecomposition.scala | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index fc283cbe1472d..99e104feb2207 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -17,10 +17,11 @@ package org.apache.spark.mllib.linalg -import org.apache.spark.annotation.Experimental import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} -import org.netlib.util.{intW, doubleW} import com.github.fommil.netlib.ARPACK +import org.netlib.util.{intW, doubleW} + +import org.apache.spark.annotation.Experimental /** * :: Experimental :: @@ -46,11 +47,11 @@ object EigenValueDecomposition { : (BDV[Double], BDM[Double]) = { require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n") - val arpack = ARPACK.getInstance() + val arpack = ARPACK.getInstance() val tolW = new doubleW(tol) val nev = new intW(k) - val ncv = scala.math.min(2*k,n) + val ncv = scala.math.min(2 * k, n) val bmat = "I" val which = "LM" @@ -63,9 +64,9 @@ object EigenValueDecomposition { var ido = new intW(0) var info = new intW(0) var resid:Array[Double] = new Array[Double](n) - var v = new Array[Double](n*ncv) - var workd = new Array[Double](3*n) - var workl = new Array[Double](ncv*(ncv+8)) + var v = new Array[Double](n * ncv) + var workd = new Array[Double](n * 3) + var workl = new Array[Double](ncv * (ncv + 8)) var ipntr = new Array[Int](11) // first call to ARPACK @@ -74,9 +75,10 @@ object EigenValueDecomposition { val w = BDV(workd) - while(ido.`val` !=99) { - if (ido.`val` != -1 && ido.`val` != 1) + while(ido.`val` != 99) { + if (ido.`val` != -1 && ido.`val` != 1) { throw new IllegalStateException("ARPACK returns ido = " + ido.`val`) + } // multiply working vector with the matrix val inputOffset = ipntr(0) - 1 val outputOffset = ipntr(1) - 1 @@ -88,8 +90,9 @@ object EigenValueDecomposition { workd, workl, workl.length, info) } - if (info.`val` != 0) + if (info.`val` != 0) { throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val`) + } val d = new Array[Double](nev.`val`) val select = new Array[Boolean](ncv) @@ -118,4 +121,4 @@ object EigenValueDecomposition { (BDV(sortedEigenPairs.map(_._1)), sortedU) } -} \ No newline at end of file +} From 9c8051594a88b53ce83b39b127a098b31bd89aad Mon Sep 17 00:00:00 2001 From: Li Pu Date: Wed, 4 Jun 2014 01:25:58 -0700 Subject: [PATCH 04/17] use non-sparse implementation when k = n --- .../mllib/linalg/distributed/RowMatrix.scala | 15 +++++++++++++-- .../mllib/linalg/distributed/RowMatrixSuite.scala | 2 +- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index fd653a367a81d..fd599b53bef0a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -246,6 +246,9 @@ class RowMatrix( * Then we compute U via easy matrix multiplication as U = A * (V * S-1). * Note that this approach requires `O(nnz(A))` time. * + * When the requested eigenvalues k = n, a non-sparse implementation will be used, which requires + * `n^2` doubles to fit in memory and `O(n^3)` time on the master node. + * * At most k largest non-zero singular values and associated vectors are returned. * If there are k such values, then the dimensions of the return will be: * @@ -269,8 +272,16 @@ class RowMatrix( val n = numCols().toInt require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") - val (sigmaSquares: BDV[Double], u: BDM[Double]) = + val (sigmaSquares: BDV[Double], u: BDM[Double]) = if (k < n) { EigenValueDecomposition.symmetricEigs(multiplyGramianMatrix, n, k, tol) + } else { + logWarning(s"Request full SVD (k = n = $k), while ARPACK requires k strictly less than n. " + + s"Using non-sparse implementation.") + val G = computeGramianMatrix() + val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = + brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) + (sigmaSquaresFull, uFull) + } val sigmas: BDV[Double] = brzSqrt(sigmaSquares) // Determine effective rank. @@ -508,4 +519,4 @@ object RowMatrix { Matrices.dense(n, n, G.data) } -} +} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 9c346de3d2fbe..8014428b02d59 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -99,7 +99,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { val localMat = mat.toBreeze() val (localU, localSigma, localVt) = brzSvd(localMat) val localV: BDM[Double] = localVt.t.toDenseMatrix - for (k <- 1 to (n - 1)) { + for (k <- 1 to n) { val svd = mat.computeSVD(k, computeU = true) val U = svd.U val s = svd.s From 827411b7a7c7a44ec9cf0a3a3439bba0a47575f7 Mon Sep 17 00:00:00 2001 From: Li Pu Date: Wed, 4 Jun 2014 01:29:12 -0700 Subject: [PATCH 05/17] fix EOF new line --- .../org/apache/spark/mllib/linalg/distributed/RowMatrix.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index fd599b53bef0a..cc64266e8b88f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -519,4 +519,4 @@ object RowMatrix { Matrices.dense(n, n, G.data) } -} \ No newline at end of file +} From e7850ed465ceadd6a45132935013292a4845f8df Mon Sep 17 00:00:00 2001 From: Li Pu Date: Wed, 4 Jun 2014 16:56:26 -0700 Subject: [PATCH 06/17] use aggregate and axpy --- .../linalg/EigenValueDecomposition.scala | 7 ++-- .../mllib/linalg/distributed/RowMatrix.scala | 33 +++++++++++++------ 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index 99e104feb2207..f81a8b83cafb8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -36,15 +36,16 @@ object EigenValueDecomposition { * The caller needs to ensure that the input matrix is real symmetric. This function requires * memory for `n*(4*k+4)` doubles. * - * @param mul a function that multiplies the symmetric matrix with a Vector. + * @param mul a function that multiplies the symmetric matrix with a DenseVector. * @param n dimension of the square matrix (maximum Int.MaxValue). * @param k number of leading eigenvalues required. * @param tol tolerance of the eigs computation. * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors * (columns of the matrix). The number of computed eigenvalues might be smaller than k. */ - private[mllib] def symmetricEigs(mul: Vector => Vector, n: Int, k: Int, tol: Double) + private[mllib] def symmetricEigs(mul: DenseVector => DenseVector, n: Int, k: Int, tol: Double) : (BDV[Double], BDM[Double]) = { + // TODO: remove this function and use eigs in breeze when switching breeze version require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n") val arpack = ARPACK.getInstance() @@ -84,7 +85,7 @@ object EigenValueDecomposition { val outputOffset = ipntr(1) - 1 val x = w(inputOffset until inputOffset + n) val y = w(outputOffset until outputOffset + n) - y := BDV(mul(Vectors.fromBreeze(x)).toArray) + y := BDV(mul(Vectors.fromBreeze(x).asInstanceOf[DenseVector]).toArray) // call ARPACK arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, workl, workl.length, info) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index cc64266e8b88f..c24142f44ddcf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -19,7 +19,8 @@ package org.apache.spark.mllib.linalg.distributed import java.util -import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd} +import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV} +import breeze.linalg.{svd => brzSvd, axpy => brzAxpy} import breeze.numerics.{sqrt => brzSqrt} import com.github.fommil.netlib.BLAS.{getInstance => blas} @@ -201,16 +202,28 @@ class RowMatrix( } /** - * Multiply the Gramian matrix `A^T A` by a Vector on the right. + * Multiply the Gramian matrix `A^T A` by a DenseVector on the right. * - * @param v a local vector whose length must match the number of columns of this matrix - * @return a local DenseVector representing the product + * @param v a local DenseVector whose length must match the number of columns of this matrix. + * @return a local DenseVector representing the product. */ - private[mllib] def multiplyGramianMatrix(v: Vector): Vector = { - val bv = rows.map{ - row => row.toBreeze * row.toBreeze.dot(v.toBreeze) - }.reduce( (x: BV[Double], y: BV[Double]) => x + y ) - Vectors.fromBreeze(bv) + private[mllib] def multiplyGramianMatrix(v: DenseVector): DenseVector = { + val n = numCols().toInt + + val bv = rows.aggregate(BDV.zeros[Double](n))( + seqOp = (U, r) => { + val rBrz = r.toBreeze + val a = rBrz.dot(v.toBreeze) + rBrz match { + case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U) + case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U) + } + U + }, + combOp = (U1, U2) => U1 += U2 + ) + + new DenseVector(bv.data) } /** @@ -243,7 +256,7 @@ class RowMatrix( * * The decomposition is computed by providing a function that multiples a vector with A'A to * ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V. - * Then we compute U via easy matrix multiplication as U = A * (V * S-1). + * Then we compute U via easy matrix multiplication as U = A * (V * S^{-1}). * Note that this approach requires `O(nnz(A))` time. * * When the requested eigenvalues k = n, a non-sparse implementation will be used, which requires From 4c7aec3d1c5203b4825047c66bed718211f9446c Mon Sep 17 00:00:00 2001 From: Li Pu Date: Fri, 6 Jun 2014 18:33:47 -0700 Subject: [PATCH 07/17] improve comments --- .../linalg/EigenValueDecomposition.scala | 34 ++++++++++++++++--- .../mllib/linalg/distributed/RowMatrix.scala | 8 +++-- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index f81a8b83cafb8..a571cc8a1c5b6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -50,16 +50,26 @@ object EigenValueDecomposition { val arpack = ARPACK.getInstance() + // tolerance used in stopping criterion val tolW = new doubleW(tol) + // number of desired eigenvalues, 0 < nev < n val nev = new intW(k) + // nev Lanczos vectors are generated are generated in the first iteration + // ncv-nev Lanczos vectors are generated are generated in each subsequent iteration + // ncv must be smaller than n val ncv = scala.math.min(2 * k, n) + // "I" for standard eigenvalue problem, "G" for generalized eigenvalue problem val bmat = "I" + // "LM" : compute the NEV largest (in magnitude) eigenvalues val which = "LM" var iparam = new Array[Int](11) + // use exact shift in each iteration iparam(0) = 1 + // maximum number of Arnoldi update iterations, or the actual number of iterations on output iparam(2) = 300 + // Mode 1: A*x = lambda*x, A symmetric iparam(6) = 1 var ido = new intW(0) @@ -70,15 +80,17 @@ object EigenValueDecomposition { var workl = new Array[Double](ncv * (ncv + 8)) var ipntr = new Array[Int](11) - // first call to ARPACK + // call ARPACK's reverse communication, first iteration with ido = 0 arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, workl, workl.length, info) val w = BDV(workd) - while(ido.`val` != 99) { + // ido = 99 : done flag in reverse communication + while (ido.`val` != 99) { if (ido.`val` != -1 && ido.`val` != 1) { - throw new IllegalStateException("ARPACK returns ido = " + ido.`val`) + throw new IllegalStateException("ARPACK returns ido = " + ido.`val` + + " This flag is not compatible with Mode 1: A*x = lambda*x, A symmetric.") } // multiply working vector with the matrix val inputOffset = ipntr(0) - 1 @@ -86,28 +98,40 @@ object EigenValueDecomposition { val x = w(inputOffset until inputOffset + n) val y = w(outputOffset until outputOffset + n) y := BDV(mul(Vectors.fromBreeze(x).asInstanceOf[DenseVector]).toArray) - // call ARPACK + // call ARPACK's reverse communication arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, workl, workl.length, info) } if (info.`val` != 0) { - throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val`) + info.`val` match { + case 1 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + + " Maximum number of iterations taken. (Refer ARPACK user guide for details)") + case 2 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + + " No shifts could be applied. Try to increase NCV. " + + "(Refer ARPACK user guide for details)") + case _ => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + + " Please refer ARPACK user guide for error message.") + } } val d = new Array[Double](nev.`val`) val select = new Array[Boolean](ncv) + // copy the Ritz vectors val z = java.util.Arrays.copyOfRange(v, 0, nev.`val` * n) + // call ARPACK's post-processing for eigenvectors arpack.dseupd(true, "A", select, d, z, n, 0.0, bmat, n, which, nev, tol, resid, ncv, v, n, iparam, ipntr, workd, workl, workl.length, info) + // number of computed eigenvalues, might be smaller than k val computed = iparam(4) val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map{ r => (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n)) } + // sort the eigen-pairs in descending order val sortedEigenPairs = eigenPairs.sortBy(-1 * _._1) // copy eigenvectors in descending order of eigenvalues diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index c24142f44ddcf..7e7d6e0d8ab6d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -259,8 +259,9 @@ class RowMatrix( * Then we compute U via easy matrix multiplication as U = A * (V * S^{-1}). * Note that this approach requires `O(nnz(A))` time. * - * When the requested eigenvalues k = n, a non-sparse implementation will be used, which requires - * `n^2` doubles to fit in memory and `O(n^3)` time on the master node. + * ARPACK requires k to be strictly less than n. Thus when the requested eigenvalues k = n, a + * non-sparse implementation will be used, which requires `n^2` doubles to fit in memory and + * `O(n^3)` time on the master node. * * At most k largest non-zero singular values and associated vectors are returned. * If there are k such values, then the dimensions of the return will be: @@ -274,7 +275,8 @@ class RowMatrix( * @param computeU whether to compute U * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) * are treated as zero, where sigma(0) is the largest singular value. - * @param tol the tolerance of the svd computation. + * @param tol the numerical tolerance of svd computation. Larger tolerance means fewer iterations, + * but less accurate result. * @return SingularValueDecomposition(U, s, V) */ def computeSVD( From eb15100052aae878552aa437c41e548243a6a29e Mon Sep 17 00:00:00 2001 From: Li Pu Date: Thu, 12 Jun 2014 23:36:18 -0700 Subject: [PATCH 08/17] fix binary compatibility --- .../linalg/EigenValueDecomposition.scala | 1 + .../mllib/linalg/distributed/RowMatrix.scala | 23 ++++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index a571cc8a1c5b6..e6688c1ccbb22 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -30,6 +30,7 @@ import org.apache.spark.annotation.Experimental @Experimental case class EigenValueDecomposition[VType](s: Vector, V: VType) +@Experimental object EigenValueDecomposition { /** * Compute the leading k eigenvalues and eigenvectors on a symmetric square matrix using ARPACK. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 7e7d6e0d8ab6d..ecd25c1273cb4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -245,6 +245,23 @@ class RowMatrix( RowMatrix.triuToFull(n, GU.data) } + /** + * Computes the singular value decomposition of this matrix, using default tolerance (1e-9). + * + * @param k number of singular values to keep. We might return less than k if there are + * numerically zero singular values. See rCond. + * @param computeU whether to compute U + * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) + * are treated as zero, where sigma(0) is the largest singular value. + * @return SingularValueDecomposition(U, s, V) + */ + def computeSVD( + k: Int, + computeU: Boolean = false, + rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { + computeSVD(k, computeU, rCond, 1e-9) + } + /** * Computes the singular value decomposition of this matrix. * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A ~= U * S * V', @@ -281,9 +298,9 @@ class RowMatrix( */ def computeSVD( k: Int, - computeU: Boolean = false, - rCond: Double = 1e-9, - tol: Double = 1e-6): SingularValueDecomposition[RowMatrix, Matrix] = { + computeU: Boolean, + rCond: Double, + tol: Double): SingularValueDecomposition[RowMatrix, Matrix] = { val n = numCols().toInt require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") From 819824b85acfc8ace9c15e0a9c5ce317604e4f73 Mon Sep 17 00:00:00 2001 From: Li Pu Date: Tue, 17 Jun 2014 19:11:53 -0700 Subject: [PATCH 09/17] add flag for dense svd or sparse svd --- .../linalg/EigenValueDecomposition.scala | 25 ++++---- .../mllib/linalg/distributed/RowMatrix.scala | 31 ++++++---- .../linalg/distributed/RowMatrixSuite.scala | 58 ++++++++++--------- 3 files changed, 67 insertions(+), 47 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index e6688c1ccbb22..23b54e51f5b7a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -39,10 +39,14 @@ object EigenValueDecomposition { * * @param mul a function that multiplies the symmetric matrix with a DenseVector. * @param n dimension of the square matrix (maximum Int.MaxValue). - * @param k number of leading eigenvalues required. + * @param k number of leading eigenvalues required, 0 < k < n. * @param tol tolerance of the eigs computation. * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors - * (columns of the matrix). The number of computed eigenvalues might be smaller than k. + * (columns of the matrix). + * @note The number of computed eigenvalues might be smaller than k when some Ritz values do not + * satisfy the convergence criterion specified by tol (see ARPACK Users Guide, Chapter 4.6 + * for more details). The maximum number of Arnoldi update iterations is set to 300 in this + * function. */ private[mllib] def symmetricEigs(mul: DenseVector => DenseVector, n: Int, k: Int, tol: Double) : (BDV[Double], BDM[Double]) = { @@ -55,10 +59,10 @@ object EigenValueDecomposition { val tolW = new doubleW(tol) // number of desired eigenvalues, 0 < nev < n val nev = new intW(k) - // nev Lanczos vectors are generated are generated in the first iteration - // ncv-nev Lanczos vectors are generated are generated in each subsequent iteration + // nev Lanczos vectors are generated in the first iteration + // ncv-nev Lanczos vectors are generated in each subsequent iteration // ncv must be smaller than n - val ncv = scala.math.min(2 * k, n) + val ncv = math.min(2 * k, n) // "I" for standard eigenvalue problem, "G" for generalized eigenvalue problem val bmat = "I" @@ -75,7 +79,7 @@ object EigenValueDecomposition { var ido = new intW(0) var info = new intW(0) - var resid:Array[Double] = new Array[Double](n) + var resid = new Array[Double](n) var v = new Array[Double](n * ncv) var workd = new Array[Double](n * 3) var workl = new Array[Double](ncv * (ncv + 8)) @@ -128,19 +132,20 @@ object EigenValueDecomposition { // number of computed eigenvalues, might be smaller than k val computed = iparam(4) - val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map{ + val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map { r => (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n)) } // sort the eigen-pairs in descending order - val sortedEigenPairs = eigenPairs.sortBy(-1 * _._1) + val sortedEigenPairs = eigenPairs.sortBy(- _._1) // copy eigenvectors in descending order of eigenvalues val sortedU = BDM.zeros[Double](n, computed) - sortedEigenPairs.zipWithIndex.map{ + sortedEigenPairs.zipWithIndex.foreach { r => { + val b = r._2 * n for (i <- 0 until n) { - sortedU.data(r._2 * n + i) = r._1._2(i) + sortedU.data(b + i) = r._1._2(i) } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index ecd25c1273cb4..fbe822c53e1ce 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -207,13 +207,14 @@ class RowMatrix( * @param v a local DenseVector whose length must match the number of columns of this matrix. * @return a local DenseVector representing the product. */ - private[mllib] def multiplyGramianMatrix(v: DenseVector): DenseVector = { + private[mllib] def multiplyGramianMatrixBy(v: DenseVector): DenseVector = { val n = numCols().toInt + val vbr = rows.context.broadcast(v.toBreeze) val bv = rows.aggregate(BDV.zeros[Double](n))( seqOp = (U, r) => { val rBrz = r.toBreeze - val a = rBrz.dot(v.toBreeze) + val a = rBrz.dot(vbr.value) rBrz match { case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U) case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U) @@ -223,7 +224,7 @@ class RowMatrix( combOp = (U1, U2) => U1 += U2 ) - new DenseVector(bv.data) + Vectors.fromBreeze(bv).asInstanceOf[DenseVector] } /** @@ -259,7 +260,11 @@ class RowMatrix( k: Int, computeU: Boolean = false, rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { - computeSVD(k, computeU, rCond, 1e-9) + if (numCols() < 100) { + computeSVD(k, computeU, rCond, 1e-9, true) + } else { + computeSVD(k, computeU, rCond, 1e-9, false) + } } /** @@ -274,7 +279,7 @@ class RowMatrix( * The decomposition is computed by providing a function that multiples a vector with A'A to * ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V. * Then we compute U via easy matrix multiplication as U = A * (V * S^{-1}). - * Note that this approach requires `O(nnz(A))` time. + * Note that this approach requires approximately `O(k * nnz(A))` time. * * ARPACK requires k to be strictly less than n. Thus when the requested eigenvalues k = n, a * non-sparse implementation will be used, which requires `n^2` doubles to fit in memory and @@ -294,21 +299,27 @@ class RowMatrix( * are treated as zero, where sigma(0) is the largest singular value. * @param tol the numerical tolerance of svd computation. Larger tolerance means fewer iterations, * but less accurate result. + * @param isDenseSVD invoke dense SVD implementation when isDenseSVD = true. This requires + * `O(n^2)` memory and `O(n^3)` time. For a skinny matrix (m >> n) with small n, + * dense implementation might be faster. * @return SingularValueDecomposition(U, s, V) */ def computeSVD( k: Int, computeU: Boolean, rCond: Double, - tol: Double): SingularValueDecomposition[RowMatrix, Matrix] = { + tol: Double, + isDenseSVD: Boolean): SingularValueDecomposition[RowMatrix, Matrix] = { val n = numCols().toInt require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") - val (sigmaSquares: BDV[Double], u: BDM[Double]) = if (k < n) { - EigenValueDecomposition.symmetricEigs(multiplyGramianMatrix, n, k, tol) + val (sigmaSquares: BDV[Double], u: BDM[Double]) = if (!isDenseSVD && k < n) { + EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol) } else { - logWarning(s"Request full SVD (k = n = $k), while ARPACK requires k strictly less than n. " + - s"Using non-sparse implementation.") + if (!isDenseSVD && k == n) { + logWarning(s"Request full SVD (k = n = $k), while ARPACK requires k strictly less than " + + s"n. Using non-sparse implementation.") + } val G = computeGramianMatrix() val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 8014428b02d59..e2ff423ca7f79 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -95,38 +95,42 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { } test("svd of a full-rank matrix") { - for (mat <- Seq(denseMat, sparseMat)) { - val localMat = mat.toBreeze() - val (localU, localSigma, localVt) = brzSvd(localMat) - val localV: BDM[Double] = localVt.t.toDenseMatrix - for (k <- 1 to n) { - val svd = mat.computeSVD(k, computeU = true) - val U = svd.U - val s = svd.s - val V = svd.V - assert(U.numRows() === m) - assert(U.numCols() === k) - assert(s.size === k) - assert(V.numRows === n) - assert(V.numCols === k) - assertColumnEqualUpToSign(U.toBreeze(), localU, k) - assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) - assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) + for (denseSVD <- Seq(true, false)) { + for (mat <- Seq(denseMat, sparseMat)) { + val localMat = mat.toBreeze() + val (localU, localSigma, localVt) = brzSvd(localMat) + val localV: BDM[Double] = localVt.t.toDenseMatrix + for (k <- 1 to n) { + val svd = mat.computeSVD(k, true, 1e-9, 1e-9, denseSVD) + val U = svd.U + val s = svd.s + val V = svd.V + assert(U.numRows() === m) + assert(U.numCols() === k) + assert(s.size === k) + assert(V.numRows === n) + assert(V.numCols === k) + assertColumnEqualUpToSign(U.toBreeze(), localU, k) + assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) + assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) + } + val svdWithoutU = mat.computeSVD(n - 1, false, 1e-9, 1e-9, denseSVD) + assert(svdWithoutU.U === null) } - val svdWithoutU = mat.computeSVD(n - 1) - assert(svdWithoutU.U === null) } } test("svd of a low-rank matrix") { - val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) - val mat = new RowMatrix(rows, 4, 3) - val svd = mat.computeSVD(2, computeU = true) - assert(svd.s.size === 1, "should not return zero singular values") - assert(svd.U.numRows() === 4) - assert(svd.U.numCols() === 1) - assert(svd.V.numRows === 3) - assert(svd.V.numCols === 1) + for (denseSVD <- Seq(true, false)) { + val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) + val mat = new RowMatrix(rows, 4, 3) + val svd = mat.computeSVD(2, true, 1e-9, 1e-9, denseSVD) + assert(svd.s.size === 1, "should not return zero singular values") + assert(svd.U.numRows() === 4) + assert(svd.U.numCols() === 1) + assert(svd.V.numRows === 3) + assert(svd.V.numCols === 1) + } } def closeToZero(G: BDM[Double]): Boolean = { From 5543cce3b7eba1bb3c4b5b8b43ca2c0399295044 Mon Sep 17 00:00:00 2001 From: Li Pu Date: Mon, 23 Jun 2014 16:27:27 -0700 Subject: [PATCH 10/17] improve svd api --- .../linalg/EigenValueDecomposition.scala | 31 ++-- .../mllib/linalg/distributed/RowMatrix.scala | 135 ++++++++++++------ .../linalg/distributed/RowMatrixSuite.scala | 14 +- 3 files changed, 122 insertions(+), 58 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index 23b54e51f5b7a..49419f7654e67 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -25,13 +25,10 @@ import org.apache.spark.annotation.Experimental /** * :: Experimental :: - * Represents eigenvalue decomposition factors. + * Compute eigen-decomposition. */ @Experimental -case class EigenValueDecomposition[VType](s: Vector, V: VType) - -@Experimental -object EigenValueDecomposition { +private[mllib] object EigenValueDecomposition { /** * Compute the leading k eigenvalues and eigenvectors on a symmetric square matrix using ARPACK. * The caller needs to ensure that the input matrix is real symmetric. This function requires @@ -41,6 +38,7 @@ object EigenValueDecomposition { * @param n dimension of the square matrix (maximum Int.MaxValue). * @param k number of leading eigenvalues required, 0 < k < n. * @param tol tolerance of the eigs computation. + * @param maxIterations the maximum number of Arnoldi update iterations. * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors * (columns of the matrix). * @note The number of computed eigenvalues might be smaller than k when some Ritz values do not @@ -48,8 +46,12 @@ object EigenValueDecomposition { * for more details). The maximum number of Arnoldi update iterations is set to 300 in this * function. */ - private[mllib] def symmetricEigs(mul: DenseVector => DenseVector, n: Int, k: Int, tol: Double) - : (BDV[Double], BDM[Double]) = { + private[mllib] def symmetricEigs( + mul: DenseVector => DenseVector, + n: Int, + k: Int, + tol: Double, + maxIterations: Int): (BDV[Double], BDM[Double]) = { // TODO: remove this function and use eigs in breeze when switching breeze version require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n") @@ -73,7 +75,7 @@ object EigenValueDecomposition { // use exact shift in each iteration iparam(0) = 1 // maximum number of Arnoldi update iterations, or the actual number of iterations on output - iparam(2) = 300 + iparam(2) = maxIterations // Mode 1: A*x = lambda*x, A symmetric iparam(6) = 1 @@ -132,8 +134,8 @@ object EigenValueDecomposition { // number of computed eigenvalues, might be smaller than k val computed = iparam(4) - val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map { - r => (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n)) + val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map { r => + (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n)) } // sort the eigen-pairs in descending order @@ -141,15 +143,16 @@ object EigenValueDecomposition { // copy eigenvectors in descending order of eigenvalues val sortedU = BDM.zeros[Double](n, computed) - sortedEigenPairs.zipWithIndex.foreach { - r => { + sortedEigenPairs.zipWithIndex.foreach { r => { val b = r._2 * n - for (i <- 0 until n) { + var i = 0 + while (i < n) { sortedU.data(b + i) = r._1._2(i) + i += 1 } } } - (BDV(sortedEigenPairs.map(_._1)), sortedU) + (BDV[Double](sortedEigenPairs.map(_._1)), sortedU) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index fbe822c53e1ce..1c4ae0b0ccafd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -218,6 +218,9 @@ class RowMatrix( rBrz match { case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U) case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U) + case _ => + throw new UnsupportedOperationException( + s"Do not support vector operation from type ${rBrz.getClass.getName}.") } U }, @@ -247,43 +250,70 @@ class RowMatrix( } /** - * Computes the singular value decomposition of this matrix, using default tolerance (1e-9). + * Computes singular value decomposition of this matrix using dense implementation. + * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A ~= U * S * V', + * where S contains the leading singular values, U and V contain the corresponding singular + * vectors. + * + * This approach requires `n^2` doubles to fit in memory and `O(n^3)` time on the master node. + * Further, n should be less than m. For problems with small n (e.g. n < 100 and n << m), the + * dense implementation might be faster than the sparse implementation. + * + * At most k largest non-zero singular values and associated vectors are returned. + * If there are k such values, then the dimensions of the return will be: + * + * U is a RowMatrix of size m x k that satisfies U'U = eye(k), + * s is a Vector of size k, holding the singular values in descending order, + * and V is a Matrix of size n x k that satisfies V'V = eye(k). * - * @param k number of singular values to keep. We might return less than k if there are - * numerically zero singular values. See rCond. - * @param computeU whether to compute U + * @param k number of leading singular values to keep (0 < k <= n). We might return less than + * k if there are numerically zero singular values. See rCond. + * @param computeU whether to compute U. * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) * are treated as zero, where sigma(0) is the largest singular value. * @return SingularValueDecomposition(U, s, V) */ def computeSVD( k: Int, - computeU: Boolean = false, - rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { - if (numCols() < 100) { - computeSVD(k, computeU, rCond, 1e-9, true) - } else { - computeSVD(k, computeU, rCond, 1e-9, false) - } + computeU: Boolean, + rCond: Double): SingularValueDecomposition[RowMatrix, Matrix] = { + val n = numCols().toInt + require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") + val G = computeGramianMatrix() + val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = + brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) + computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquaresFull, uFull) + } + + /** + * Computes singular value decomposition of this matrix using dense implementation with default + * reciprocal condition number (1e-9). See computeSVD for more details. + * + * @param k number of leading singular values to keep (0 < k <= n). We might return less than + * k if there are numerically zero singular values. + * @param computeU whether to compute U. + * @return SingularValueDecomposition(U, s, V) + */ + def computeSVD( + k: Int, + computeU: Boolean = false): SingularValueDecomposition[RowMatrix, Matrix] = { + computeSVD(k, computeU, 1e-9) } /** - * Computes the singular value decomposition of this matrix. + * Computes singular value decomposition of this matrix using sparse implementation. * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A ~= U * S * V', * where S contains the leading singular values, U and V contain the corresponding singular * vectors. * - * There is no restriction on m, but we require `n*(6*k+4)` doubles to fit in memory on the master - * node. Further, n should be less than m. - * * The decomposition is computed by providing a function that multiples a vector with A'A to * ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V. * Then we compute U via easy matrix multiplication as U = A * (V * S^{-1}). * Note that this approach requires approximately `O(k * nnz(A))` time. * - * ARPACK requires k to be strictly less than n. Thus when the requested eigenvalues k = n, a - * non-sparse implementation will be used, which requires `n^2` doubles to fit in memory and - * `O(n^3)` time on the master node. + * There is no restriction on m, but we require `n*(6*k+4)` doubles to fit in memory on the master + * node. Further, n should be less than m, and ARPACK requires k to be strictly less than n. If + * the requested k = n, please use the dense implementation computeSVD. * * At most k largest non-zero singular values and associated vectors are returned. * If there are k such values, then the dimensions of the return will be: @@ -292,46 +322,69 @@ class RowMatrix( * s is a Vector of size k, holding the singular values in descending order, * and V is a Matrix of size n x k that satisfies V'V = eye(k). * - * @param k number of singular values to keep. We might return less than k if there are - * numerically zero singular values. See rCond. - * @param computeU whether to compute U + * @param k number of leading singular values to keep (0 < k < n). We might return less than + * k if there are numerically zero singular values. See rCond. + * @param computeU whether to compute U. * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) * are treated as zero, where sigma(0) is the largest singular value. - * @param tol the numerical tolerance of svd computation. Larger tolerance means fewer iterations, + * @param tol the numerical tolerance of SVD computation. Larger tolerance means fewer iterations, * but less accurate result. - * @param isDenseSVD invoke dense SVD implementation when isDenseSVD = true. This requires - * `O(n^2)` memory and `O(n^3)` time. For a skinny matrix (m >> n) with small n, - * dense implementation might be faster. + * @param maxIterations the maximum number of Arnoldi update iterations. * @return SingularValueDecomposition(U, s, V) */ - def computeSVD( + def computeSparseSVD( k: Int, computeU: Boolean, rCond: Double, tol: Double, - isDenseSVD: Boolean): SingularValueDecomposition[RowMatrix, Matrix] = { + maxIterations: Int): SingularValueDecomposition[RowMatrix, Matrix] = { val n = numCols().toInt - require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") + require(k > 0 && k < n, s"Request up to n - 1 singular values k=$k n=$n. " + + s"For full SVD (i.e. k = n), please use dense implementation computeSVD.") + val (sigmaSquares: BDV[Double], u: BDM[Double]) = + EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIterations) + computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquares, u) + } - val (sigmaSquares: BDV[Double], u: BDM[Double]) = if (!isDenseSVD && k < n) { - EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol) - } else { - if (!isDenseSVD && k == n) { - logWarning(s"Request full SVD (k = n = $k), while ARPACK requires k strictly less than " + - s"n. Using non-sparse implementation.") - } - val G = computeGramianMatrix() - val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = - brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) - (sigmaSquaresFull, uFull) - } + /** + * Computes singular value decomposition of this matrix using sparse implementation with default + * reciprocal condition number (1e-9), tolerance (1e-10), and maximum number of Arnoldi iterations + * (300). See computeSparseSVD for more details. + * + * @param k number of leading singular values to keep (0 < k < n). We might return less than + * k if there are numerically zero singular values. + * @param computeU whether to compute U. + * @return SingularValueDecomposition(U, s, V) + */ + def computeSparseSVD( + k: Int, + computeU: Boolean = false): SingularValueDecomposition[RowMatrix, Matrix] = { + computeSparseSVD(k, computeU, 1e-9, 1e-10, 300) + } + + /** + * Determine effective rank of SVD result and compute left singular vectors if required. + */ + private def computeSVDEffectiveRank( + k: Int, + n: Int, + computeU: Boolean, + rCond: Double, + sigmaSquares: BDV[Double], + u: BDM[Double]): SingularValueDecomposition[RowMatrix, Matrix] = { val sigmas: BDV[Double] = brzSqrt(sigmaSquares) // Determine effective rank. val sigma0 = sigmas(0) val threshold = rCond * sigma0 var i = 0 - while (i < k && sigmas(i) >= threshold) { + // sigmas might have a length smaller than k, if some Ritz values do not satisfy the + // convergence criterion specified by tol after maxIterations. + // Thus use i < min(k, sigmas.length) instead of i < k + if (sigmas.length < k) { + logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.") + } + while (i < math.min(k, sigmas.length) && sigmas(i) >= threshold) { i += 1 } val sk = i diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index e2ff423ca7f79..23466f2fff1a0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -101,7 +101,13 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { val (localU, localSigma, localVt) = brzSvd(localMat) val localV: BDM[Double] = localVt.t.toDenseMatrix for (k <- 1 to n) { - val svd = mat.computeSVD(k, true, 1e-9, 1e-9, denseSVD) + val svd = if (k < n) { + if (denseSVD) mat.computeSVD(k, computeU = true) + else mat.computeSparseSVD(k, computeU = true) + } else { + // when k = n, always use dense SVD + mat.computeSVD(k, computeU = true) + } val U = svd.U val s = svd.s val V = svd.V @@ -114,7 +120,8 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) } - val svdWithoutU = mat.computeSVD(n - 1, false, 1e-9, 1e-9, denseSVD) + val svdWithoutU = if (denseSVD) mat.computeSVD(n - 1, computeU = false) + else mat.computeSparseSVD(n - 1, computeU = false) assert(svdWithoutU.U === null) } } @@ -124,7 +131,8 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { for (denseSVD <- Seq(true, false)) { val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) val mat = new RowMatrix(rows, 4, 3) - val svd = mat.computeSVD(2, true, 1e-9, 1e-9, denseSVD) + val svd = if (denseSVD) mat.computeSVD(2, computeU = true) + else mat.computeSparseSVD(2, computeU = true) assert(svd.s.size === 1, "should not return zero singular values") assert(svd.U.numRows() === 4) assert(svd.U.numCols() === 1) From 71484263409c03669be825b50714731fa9c46f6c Mon Sep 17 00:00:00 2001 From: Li Pu Date: Thu, 26 Jun 2014 00:09:48 -0700 Subject: [PATCH 11/17] improve RowMatrix multiply --- .../mllib/linalg/distributed/RowMatrix.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 1c4ae0b0ccafd..66f1a8066525b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -509,15 +509,24 @@ class RowMatrix( */ def multiply(B: Matrix): RowMatrix = { val n = numCols().toInt + val k = B.numCols require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}") require(B.isInstanceOf[DenseMatrix], s"Only support dense matrix at this time but found ${B.getClass.getName}.") - val Bb = rows.context.broadcast(B) + val Bb = rows.context.broadcast(B.toBreeze.asInstanceOf[BDM[Double]].toDenseVector.toArray) val AB = rows.mapPartitions({ iter => - val Bi = Bb.value.toBreeze.asInstanceOf[BDM[Double]] - iter.map(v => Vectors.fromBreeze(Bi.t * v.toBreeze)) + val Bi = Bb.value + iter.map(row => { + val v = BDV.zeros[Double](k) + var i = 0 + while (i < k) { + v(i) = row.toBreeze.dot(new BDV(Bi, i * n, 1, n)) + i += 1 + } + Vectors.fromBreeze(v) + }) }, preservesPartitioning = true) new RowMatrix(AB, nRows, B.numCols) From c2737714b696d3cfae3b1efd0bde6a8d44a47b95 Mon Sep 17 00:00:00 2001 From: Li Pu Date: Mon, 7 Jul 2014 13:49:29 -0700 Subject: [PATCH 12/17] automatically determine SVD compute mode and parameters --- .../mllib/linalg/distributed/RowMatrix.scala | 181 +++++++++--------- .../linalg/distributed/RowMatrixSuite.scala | 20 +- 2 files changed, 104 insertions(+), 97 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 66f1a8066525b..cd9d28a4f976c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -250,70 +250,29 @@ class RowMatrix( } /** - * Computes singular value decomposition of this matrix using dense implementation. - * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A ~= U * S * V', - * where S contains the leading singular values, U and V contain the corresponding singular - * vectors. + * Computes singular value decomposition of this matrix. Denote this matrix by A (m x n), this + * will compute matrices U, S, V such that A ~= U * S * V', where S contains the leading k + * singular values, U and V contain the corresponding singular vectors. * - * This approach requires `n^2` doubles to fit in memory and `O(n^3)` time on the master node. - * Further, n should be less than m. For problems with small n (e.g. n < 100 and n << m), the - * dense implementation might be faster than the sparse implementation. + * This approach assumes n is smaller than m, and invokes a dense matrix implementation when n is + * small (n < 100) or the number of requested singular values is the same as n (k == n). For + * problems with large n (n >= 100) and k < n, this approach invokes a sparse matrix + * implementation that provides a function to ARPACK to multiply a vector with A'A. It iteratively + * calls ARPACK-dsaupd on the master node, from which we recover S and V. Then we compute U via + * easy matrix multiplication as U = A * (V * S^{-1}). * - * At most k largest non-zero singular values and associated vectors are returned. - * If there are k such values, then the dimensions of the return will be: + * The dense implementation requires `n^2` doubles to fit in memory and `O(n^3)` time on the + * master node. * - * U is a RowMatrix of size m x k that satisfies U'U = eye(k), - * s is a Vector of size k, holding the singular values in descending order, - * and V is a Matrix of size n x k that satisfies V'V = eye(k). + * The sparse implementation requires `n * (6 * k + 4)` doubles to fit in memory on the master + * node and approximately `O(k * nnz(A))` time distributed on all worker nodes. There is no + * restriction on m (number of rows). * - * @param k number of leading singular values to keep (0 < k <= n). We might return less than - * k if there are numerically zero singular values. See rCond. - * @param computeU whether to compute U. - * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) - * are treated as zero, where sigma(0) is the largest singular value. - * @return SingularValueDecomposition(U, s, V) - */ - def computeSVD( - k: Int, - computeU: Boolean, - rCond: Double): SingularValueDecomposition[RowMatrix, Matrix] = { - val n = numCols().toInt - require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") - val G = computeGramianMatrix() - val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = - brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) - computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquaresFull, uFull) - } - - /** - * Computes singular value decomposition of this matrix using dense implementation with default - * reciprocal condition number (1e-9). See computeSVD for more details. - * - * @param k number of leading singular values to keep (0 < k <= n). We might return less than - * k if there are numerically zero singular values. - * @param computeU whether to compute U. - * @return SingularValueDecomposition(U, s, V) - */ - def computeSVD( - k: Int, - computeU: Boolean = false): SingularValueDecomposition[RowMatrix, Matrix] = { - computeSVD(k, computeU, 1e-9) - } - - /** - * Computes singular value decomposition of this matrix using sparse implementation. - * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A ~= U * S * V', - * where S contains the leading singular values, U and V contain the corresponding singular - * vectors. - * - * The decomposition is computed by providing a function that multiples a vector with A'A to - * ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V. - * Then we compute U via easy matrix multiplication as U = A * (V * S^{-1}). - * Note that this approach requires approximately `O(k * nnz(A))` time. - * - * There is no restriction on m, but we require `n*(6*k+4)` doubles to fit in memory on the master - * node. Further, n should be less than m, and ARPACK requires k to be strictly less than n. If - * the requested k = n, please use the dense implementation computeSVD. + * Several internal parameters are set to default values. The reciprocal condition number rCond + * is set to 1e-9. All singular values smaller than rCond * sigma(0) are treated as zeros, where + * sigma(0) is the largest singular value. The maximum number of Arnoldi update iterations for + * ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK + * eigen-decomposition is set to 1e-10. * * At most k largest non-zero singular values and associated vectors are returned. * If there are k such values, then the dimensions of the return will be: @@ -322,44 +281,86 @@ class RowMatrix( * s is a Vector of size k, holding the singular values in descending order, * and V is a Matrix of size n x k that satisfies V'V = eye(k). * - * @param k number of leading singular values to keep (0 < k < n). We might return less than - * k if there are numerically zero singular values. See rCond. + * @param k number of leading singular values to keep (0 < k <= n). It might return less than + * k if there are numerically zero singular values or there are not enough Ritz values + * converged before the maximum number of Arnoldi update iterations is reached (in case + * that matrix A is ill-conditioned). * @param computeU whether to compute U. * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) * are treated as zero, where sigma(0) is the largest singular value. - * @param tol the numerical tolerance of SVD computation. Larger tolerance means fewer iterations, - * but less accurate result. - * @param maxIterations the maximum number of Arnoldi update iterations. - * @return SingularValueDecomposition(U, s, V) + * @return SingularValueDecomposition(U, s, V), U = null if computeU = false */ - def computeSparseSVD( - k: Int, - computeU: Boolean, - rCond: Double, - tol: Double, - maxIterations: Int): SingularValueDecomposition[RowMatrix, Matrix] = { - val n = numCols().toInt - require(k > 0 && k < n, s"Request up to n - 1 singular values k=$k n=$n. " + - s"For full SVD (i.e. k = n), please use dense implementation computeSVD.") - val (sigmaSquares: BDV[Double], u: BDM[Double]) = - EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIterations) - computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquares, u) + def computeSVD(k: Int, + computeU: Boolean = false, + rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { + // maximum number of Arnoldi update iterations for invoking ARPACK + val maxIter = math.max(300, k * 3) + // numerical tolerance for invoking ARPACK + val tol = 1e-10 + computeSVD(k, computeU, rCond, maxIter, tol, "auto") } /** - * Computes singular value decomposition of this matrix using sparse implementation with default - * reciprocal condition number (1e-9), tolerance (1e-10), and maximum number of Arnoldi iterations - * (300). See computeSparseSVD for more details. - * - * @param k number of leading singular values to keep (0 < k < n). We might return less than - * k if there are numerically zero singular values. - * @param computeU whether to compute U. - * @return SingularValueDecomposition(U, s, V) + * Actual SVD computation, visible for testing. */ - def computeSparseSVD( - k: Int, - computeU: Boolean = false): SingularValueDecomposition[RowMatrix, Matrix] = { - computeSparseSVD(k, computeU, 1e-9, 1e-10, 300) + private[mllib] def computeSVD(k: Int, + computeU: Boolean, + rCond: Double, + maxIter: Int, + tol: Double, + mode: String): SingularValueDecomposition[RowMatrix, Matrix] = { + val n = numCols().toInt + + object SVDMode extends Enumeration { + val DenseARPACK, DenseLAPACK, SparseARPACK = Value + } + + val derivedMode = mode match { + case "auto" => if (n < 100 || k == n) { + // invoke dense implementation when n is small or k == n (since ARPACK requires k < n) + require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") + "dense" + } else { + // invoke sparse implementation with ARPACK when n is large + require(k > 0 && k < n, s"Request up to n - 1 singular values for ARPACK k=$k n=$n.") + "sparse" + } + case "dense" => "dense" + case "sparse" => "sparse" + case _ => throw new IllegalArgumentException(s"Do not support mode $mode.") + } + + val computeMode = derivedMode match { + case "dense" => if (k < n / 2) { + // when k is small, call ARPACK + require(k > 0 && k < n, s"Request up to n - 1 singular values for ARPACK k=$k n=$n.") + SVDMode.DenseARPACK + } else { + // when k is large, call LAPACK + SVDMode.DenseLAPACK + } + case "sparse" => SVDMode.SparseARPACK + } + + val (sigmaSquares: BDV[Double], u: BDM[Double]) = computeMode match { + case SVDMode.DenseARPACK => { + val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] + def multiplyDenseGramianMatrixBy(v: DenseVector): DenseVector = { + Vectors.fromBreeze(G * v.toBreeze).asInstanceOf[DenseVector] + } + EigenValueDecomposition.symmetricEigs(multiplyDenseGramianMatrixBy, n, k, tol, maxIter) + } + case SVDMode.DenseLAPACK => { + val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] + val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = brzSvd(G) + (sigmaSquaresFull, uFull) + } + case SVDMode.SparseARPACK => { + EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter) + } + } + + computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquares, u) } /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 23466f2fff1a0..3091d1f967cf5 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -102,11 +102,14 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { val localV: BDM[Double] = localVt.t.toDenseMatrix for (k <- 1 to n) { val svd = if (k < n) { - if (denseSVD) mat.computeSVD(k, computeU = true) - else mat.computeSparseSVD(k, computeU = true) + if (denseSVD) { + mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "dense") + } else { + mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "sparse") + } } else { // when k = n, always use dense SVD - mat.computeSVD(k, computeU = true) + mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "dense") } val U = svd.U val s = svd.s @@ -120,8 +123,11 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) } - val svdWithoutU = if (denseSVD) mat.computeSVD(n - 1, computeU = false) - else mat.computeSparseSVD(n - 1, computeU = false) + val svdWithoutU = if (denseSVD) { + mat.computeSVD(n - 1, computeU = false, 1e-9, 300, 1e-10, mode = "dense") + } else { + mat.computeSVD(n - 1, computeU = false, 1e-9, 300, 1e-10, mode = "sparse") + } assert(svdWithoutU.U === null) } } @@ -131,8 +137,8 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { for (denseSVD <- Seq(true, false)) { val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) val mat = new RowMatrix(rows, 4, 3) - val svd = if (denseSVD) mat.computeSVD(2, computeU = true) - else mat.computeSparseSVD(2, computeU = true) + val svd = if (denseSVD) mat.computeSVD(2, computeU = true, 1e-9, 300, 1e-10, mode = "dense") + else mat.computeSVD(2, computeU = true, 1e-9, 300, 1e-10, mode = "sparse") assert(svd.s.size === 1, "should not return zero singular values") assert(svd.U.numRows() === 4) assert(svd.U.numCols() === 1) From 62969fa4e06a715025483ed282b29427075bbbf1 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Jul 2014 17:54:54 -0700 Subject: [PATCH 13/17] use BDV directly in symmetricEigs change the computation mode to local-svd, local-eigs, and dist-eigs update tests and docs --- .../linalg/EigenValueDecomposition.scala | 21 +- .../mllib/linalg/distributed/RowMatrix.scala | 183 +++++++++--------- .../linalg/distributed/RowMatrixSuite.scala | 55 +++--- 3 files changed, 120 insertions(+), 139 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index 49419f7654e67..3515461b52493 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -47,7 +47,7 @@ private[mllib] object EigenValueDecomposition { * function. */ private[mllib] def symmetricEigs( - mul: DenseVector => DenseVector, + mul: BDV[Double] => BDV[Double], n: Int, k: Int, tol: Double, @@ -102,9 +102,9 @@ private[mllib] object EigenValueDecomposition { // multiply working vector with the matrix val inputOffset = ipntr(0) - 1 val outputOffset = ipntr(1) - 1 - val x = w(inputOffset until inputOffset + n) - val y = w(outputOffset until outputOffset + n) - y := BDV(mul(Vectors.fromBreeze(x).asInstanceOf[DenseVector]).toArray) + val x = w.slice(inputOffset, inputOffset + n) + val y = w.slice(outputOffset, outputOffset + n) + y := mul(x) // call ARPACK's reverse communication arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, workl, workl.length, info) @@ -143,13 +143,12 @@ private[mllib] object EigenValueDecomposition { // copy eigenvectors in descending order of eigenvalues val sortedU = BDM.zeros[Double](n, computed) - sortedEigenPairs.zipWithIndex.foreach { r => { - val b = r._2 * n - var i = 0 - while (i < n) { - sortedU.data(b + i) = r._1._2(i) - i += 1 - } + sortedEigenPairs.zipWithIndex.foreach { r => + val b = r._2 * n + var i = 0 + while (i < n) { + sortedU.data(b + i) = r._1._2(i) + i += 1 } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index cd9d28a4f976c..29c0adc51fe2b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.linalg.distributed -import java.util +import java.util.Arrays import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV} import breeze.linalg.{svd => brzSvd, axpy => brzAxpy} @@ -207,9 +207,9 @@ class RowMatrix( * @param v a local DenseVector whose length must match the number of columns of this matrix. * @return a local DenseVector representing the product. */ - private[mllib] def multiplyGramianMatrixBy(v: DenseVector): DenseVector = { + private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = { val n = numCols().toInt - val vbr = rows.context.broadcast(v.toBreeze) + val vbr = rows.context.broadcast(v) val bv = rows.aggregate(BDV.zeros[Double](n))( seqOp = (U, r) => { @@ -227,7 +227,7 @@ class RowMatrix( combOp = (U1, U2) => U1 += U2 ) - Vectors.fromBreeze(bv).asInstanceOf[DenseVector] + bv } /** @@ -250,49 +250,51 @@ class RowMatrix( } /** - * Computes singular value decomposition of this matrix. Denote this matrix by A (m x n), this + * Computes singular value decomposition of this matrix. Denote this matrix by A (m x n). This * will compute matrices U, S, V such that A ~= U * S * V', where S contains the leading k * singular values, U and V contain the corresponding singular vectors. * - * This approach assumes n is smaller than m, and invokes a dense matrix implementation when n is - * small (n < 100) or the number of requested singular values is the same as n (k == n). For - * problems with large n (n >= 100) and k < n, this approach invokes a sparse matrix - * implementation that provides a function to ARPACK to multiply a vector with A'A. It iteratively - * calls ARPACK-dsaupd on the master node, from which we recover S and V. Then we compute U via - * easy matrix multiplication as U = A * (V * S^{-1}). + * At most k largest non-zero singular values and associated vectors are returned. If there are k + * such values, then the dimensions of the return will be: + * - U is a RowMatrix of size m x k that satisfies U' * U = eye(k), + * - s is a Vector of size k, holding the singular values in descending order, + * - V is a Matrix of size n x k that satisfies V' * V = eye(k). * - * The dense implementation requires `n^2` doubles to fit in memory and `O(n^3)` time on the - * master node. - * - * The sparse implementation requires `n * (6 * k + 4)` doubles to fit in memory on the master - * node and approximately `O(k * nnz(A))` time distributed on all worker nodes. There is no - * restriction on m (number of rows). + * We assume n is smaller than m. The singular values and the right singular vectors are derived + * from the eigenvalues and the eigenvectors of the Gramian matrix A' * A. U, the matrix + * storing the right singular vectors, is computed via matrix multiplication as + * U = A * (V * S^{-1}), if requested by user. The actual method to use is determined + * automatically based on the cost: + * - If n is small (n < 100) or k is large compared with n (k > n / 2), we compute the Gramian + * matrix first and then compute its top eigenvalues and eigenvectors locally on the driver. + * This requires a single pass with O(n^2) storage on each executor and on the driver, and + * O(n^2 k) time on the driver. + * - Otherwise, we compute (A' * A) * v in a distributive way and send it to ARPACK's DSAUPD to + * compute (A' * A)'s top eigenvalues and eigenvectors on the driver node. This requires O(k) + * passes, O(n) storage on each executor, and O(n k) storage on the driver. * * Several internal parameters are set to default values. The reciprocal condition number rCond * is set to 1e-9. All singular values smaller than rCond * sigma(0) are treated as zeros, where * sigma(0) is the largest singular value. The maximum number of Arnoldi update iterations for - * ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK + * ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK's * eigen-decomposition is set to 1e-10. * - * At most k largest non-zero singular values and associated vectors are returned. - * If there are k such values, then the dimensions of the return will be: - * - * U is a RowMatrix of size m x k that satisfies U'U = eye(k), - * s is a Vector of size k, holding the singular values in descending order, - * and V is a Matrix of size n x k that satisfies V'V = eye(k). + * @note The conditions that decide which method to use internally and the default parameters are + * subject to change. * - * @param k number of leading singular values to keep (0 < k <= n). It might return less than - * k if there are numerically zero singular values or there are not enough Ritz values + * @param k number of leading singular values to keep (0 < k <= n). It might return less than k if + * there are numerically zero singular values or there are not enough Ritz values * converged before the maximum number of Arnoldi update iterations is reached (in case * that matrix A is ill-conditioned). - * @param computeU whether to compute U. + * @param computeU whether to compute U * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) * are treated as zero, where sigma(0) is the largest singular value. - * @return SingularValueDecomposition(U, s, V), U = null if computeU = false + * @return SingularValueDecomposition(U, s, V). U = null if computeU = false. */ - def computeSVD(k: Int, - computeU: Boolean = false, - rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { + def computeSVD( + k: Int, + computeU: Boolean = false, + rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { // maximum number of Arnoldi update iterations for invoking ARPACK val maxIter = math.max(300, k * 3) // numerical tolerance for invoking ARPACK @@ -301,87 +303,78 @@ class RowMatrix( } /** - * Actual SVD computation, visible for testing. + * The actual SVD implementation, visible for testing. + * + * @param k number of leading singular values to keep (0 < k <= n) + * @param computeU whether to compute U + * @param rCond the reciprocal condition number + * @param maxIter max number of iterations (if ARPACK is used) + * @param tol termination tolerance (if ARPACK is used) + * @param mode computation mode (auto: determine automatically which mode to use, + * local-svd: compute gram matrix and computes its full SVD locally, + * local-eigs: compute gram matrix and computes its top eigenvalues locally, + * dist-eigs: compute the top eigenvalues of the gram matrix distributively) + * @return SingularValueDecomposition(U, s, V). U = null if computeU = false. */ - private[mllib] def computeSVD(k: Int, - computeU: Boolean, - rCond: Double, - maxIter: Int, - tol: Double, - mode: String): SingularValueDecomposition[RowMatrix, Matrix] = { + private[mllib] def computeSVD( + k: Int, + computeU: Boolean, + rCond: Double, + maxIter: Int, + tol: Double, + mode: String): SingularValueDecomposition[RowMatrix, Matrix] = { val n = numCols().toInt + require(k > 0 && k <= n, s"Request up to n singular values but got k=$k and n=$n.") object SVDMode extends Enumeration { - val DenseARPACK, DenseLAPACK, SparseARPACK = Value + val LocalARPACK, LocalLAPACK, DistARPACK = Value } - val derivedMode = mode match { - case "auto" => if (n < 100 || k == n) { - // invoke dense implementation when n is small or k == n (since ARPACK requires k < n) - require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") - "dense" - } else { - // invoke sparse implementation with ARPACK when n is large - require(k > 0 && k < n, s"Request up to n - 1 singular values for ARPACK k=$k n=$n.") - "sparse" - } - case "dense" => "dense" - case "sparse" => "sparse" + val computeMode = mode match { + case "auto" => + // TODO: The conditions below are not fully tested. + if (n < 100 || k > n / 2) { + // If n is small or k is large compared with n, we better compute the Gramian matrix first + // and then compute its eigenvalues locally, instead of making multiple passes. + if (k < n / 3) { + SVDMode.LocalARPACK + } else { + SVDMode.LocalLAPACK + } + } else { + // If k is small compared with n, we use ARPACK with distributed multiplication. + SVDMode.DistARPACK + } + case "local-svd" => SVDMode.LocalLAPACK + case "local-eigs" => SVDMode.LocalARPACK + case "dist-eigs" => SVDMode.DistARPACK case _ => throw new IllegalArgumentException(s"Do not support mode $mode.") } - val computeMode = derivedMode match { - case "dense" => if (k < n / 2) { - // when k is small, call ARPACK - require(k > 0 && k < n, s"Request up to n - 1 singular values for ARPACK k=$k n=$n.") - SVDMode.DenseARPACK - } else { - // when k is large, call LAPACK - SVDMode.DenseLAPACK - } - case "sparse" => SVDMode.SparseARPACK - } - + // Compute the eigen-decomposition of A' * A. val (sigmaSquares: BDV[Double], u: BDM[Double]) = computeMode match { - case SVDMode.DenseARPACK => { + case SVDMode.LocalARPACK => + require(k < n, s"k must be smaller than n in local-eigs mode but got k=$k and n=$n.") val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] - def multiplyDenseGramianMatrixBy(v: DenseVector): DenseVector = { - Vectors.fromBreeze(G * v.toBreeze).asInstanceOf[DenseVector] - } - EigenValueDecomposition.symmetricEigs(multiplyDenseGramianMatrixBy, n, k, tol, maxIter) - } - case SVDMode.DenseLAPACK => { + EigenValueDecomposition.symmetricEigs(v => G * v, n, k, tol, maxIter) + case SVDMode.LocalLAPACK => val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] - val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = brzSvd(G) + val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G) (sigmaSquaresFull, uFull) - } - case SVDMode.SparseARPACK => { + case SVDMode.DistARPACK => + require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.") EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter) - } } - computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquares, u) - } - - /** - * Determine effective rank of SVD result and compute left singular vectors if required. - */ - private def computeSVDEffectiveRank( - k: Int, - n: Int, - computeU: Boolean, - rCond: Double, - sigmaSquares: BDV[Double], - u: BDM[Double]): SingularValueDecomposition[RowMatrix, Matrix] = { val sigmas: BDV[Double] = brzSqrt(sigmaSquares) - // Determine effective rank. + // Determine the effective rank. val sigma0 = sigmas(0) val threshold = rCond * sigma0 var i = 0 - // sigmas might have a length smaller than k, if some Ritz values do not satisfy the - // convergence criterion specified by tol after maxIterations. - // Thus use i < min(k, sigmas.length) instead of i < k + // sigmas might have a length smaller than k, if some Ritz values do not satisfy the convergence + // criterion specified by tol after max number of iterations. + // Thus use i < min(k, sigmas.length) instead of i < k. if (sigmas.length < k) { logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.") } @@ -394,12 +387,12 @@ class RowMatrix( logWarning(s"Requested $k singular values but only found $sk nonzeros.") } - val s = Vectors.dense(util.Arrays.copyOfRange(sigmas.data, 0, sk)) - val V = Matrices.dense(n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk)) + val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk)) + val V = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk)) if (computeU) { // N = Vk * Sk^{-1} - val N = new BDM[Double](n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk)) + val N = new BDM[Double](n, sk, Arrays.copyOfRange(u.data, 0, n * sk)) var i = 0 var j = 0 while (j < sk) { @@ -484,7 +477,7 @@ class RowMatrix( if (k == n) { Matrices.dense(n, k, u.data) } else { - Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k)) + Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 3091d1f967cf5..a961f89456a18 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -95,51 +95,40 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { } test("svd of a full-rank matrix") { - for (denseSVD <- Seq(true, false)) { - for (mat <- Seq(denseMat, sparseMat)) { + for (mat <- Seq(denseMat, sparseMat)) { + for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) { val localMat = mat.toBreeze() val (localU, localSigma, localVt) = brzSvd(localMat) val localV: BDM[Double] = localVt.t.toDenseMatrix for (k <- 1 to n) { - val svd = if (k < n) { - if (denseSVD) { - mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "dense") - } else { - mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "sparse") - } - } else { - // when k = n, always use dense SVD - mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "dense") + val skip = (mode == "local-eigs" || mode == "dist-eigs") && k == n + if (!skip) { + val svd = mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode) + val U = svd.U + val s = svd.s + val V = svd.V + assert(U.numRows() === m) + assert(U.numCols() === k) + assert(s.size === k) + assert(V.numRows === n) + assert(V.numCols === k) + assertColumnEqualUpToSign(U.toBreeze(), localU, k) + assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) + assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) } - val U = svd.U - val s = svd.s - val V = svd.V - assert(U.numRows() === m) - assert(U.numCols() === k) - assert(s.size === k) - assert(V.numRows === n) - assert(V.numCols === k) - assertColumnEqualUpToSign(U.toBreeze(), localU, k) - assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) - assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) - } - val svdWithoutU = if (denseSVD) { - mat.computeSVD(n - 1, computeU = false, 1e-9, 300, 1e-10, mode = "dense") - } else { - mat.computeSVD(n - 1, computeU = false, 1e-9, 300, 1e-10, mode = "sparse") } + val svdWithoutU = mat.computeSVD(1, computeU = false, 1e-9, 300, 1e-10, mode) assert(svdWithoutU.U === null) } } } test("svd of a low-rank matrix") { - for (denseSVD <- Seq(true, false)) { - val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) - val mat = new RowMatrix(rows, 4, 3) - val svd = if (denseSVD) mat.computeSVD(2, computeU = true, 1e-9, 300, 1e-10, mode = "dense") - else mat.computeSVD(2, computeU = true, 1e-9, 300, 1e-10, mode = "sparse") - assert(svd.s.size === 1, "should not return zero singular values") + val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) + val mat = new RowMatrix(rows, 4, 3) + for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) { + val svd = mat.computeSVD(2, computeU = true, 1e-6, 300, 1e-10, mode) + assert(svd.s.size === 1, s"should not return zero singular values but got ${svd.s}") assert(svd.U.numRows() === 4) assert(svd.U.numCols() === 1) assert(svd.V.numRows === 3) From 861ec48bc74616b47d45ad3b828097a35045050f Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Jul 2014 18:09:23 -0700 Subject: [PATCH 14/17] simplify axpy --- .../mllib/linalg/distributed/RowMatrix.scala | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 29c0adc51fe2b..67d79109cda00 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -202,32 +202,23 @@ class RowMatrix( } /** - * Multiply the Gramian matrix `A^T A` by a DenseVector on the right. + * Multiplies the Gramian matrix `A^T A` by a dense vector on the right without computing `A^T A`. * - * @param v a local DenseVector whose length must match the number of columns of this matrix. - * @return a local DenseVector representing the product. + @param v a dense vector whose length must match the number of columns of this matrix + * @return a dense vector representing the product */ private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = { val n = numCols().toInt val vbr = rows.context.broadcast(v) - - val bv = rows.aggregate(BDV.zeros[Double](n))( + rows.aggregate(BDV.zeros[Double](n))( seqOp = (U, r) => { val rBrz = r.toBreeze val a = rBrz.dot(vbr.value) - rBrz match { - case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U) - case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U) - case _ => - throw new UnsupportedOperationException( - s"Do not support vector operation from type ${rBrz.getClass.getName}.") - } + brzAxpy(a, rBrz, U.asInstanceOf[BV[Double]]) U }, combOp = (U1, U2) => U1 += U2 ) - - bv } /** From a461082d98828501eccfbb59c8813c5fbd2ef826 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Jul 2014 18:43:18 -0700 Subject: [PATCH 15/17] make superscript show up correctly in doc --- .../apache/spark/mllib/linalg/distributed/RowMatrix.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 67d79109cda00..e003bf17178e5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -254,12 +254,12 @@ class RowMatrix( * We assume n is smaller than m. The singular values and the right singular vectors are derived * from the eigenvalues and the eigenvectors of the Gramian matrix A' * A. U, the matrix * storing the right singular vectors, is computed via matrix multiplication as - * U = A * (V * S^{-1}), if requested by user. The actual method to use is determined + * U = A * (V * S^-1^), if requested by user. The actual method to use is determined * automatically based on the cost: * - If n is small (n < 100) or k is large compared with n (k > n / 2), we compute the Gramian * matrix first and then compute its top eigenvalues and eigenvectors locally on the driver. - * This requires a single pass with O(n^2) storage on each executor and on the driver, and - * O(n^2 k) time on the driver. + * This requires a single pass with O(n^2^) storage on each executor and on the driver, and + * O(n^2^ k) time on the driver. * - Otherwise, we compute (A' * A) * v in a distributive way and send it to ARPACK's DSAUPD to * compute (A' * A)'s top eigenvalues and eigenvectors on the driver node. This requires O(k) * passes, O(n) storage on each executor, and O(n k) storage on the driver. From 7312ec10b1be13a41e46c4b8d164302c8497514a Mon Sep 17 00:00:00 2001 From: Li Pu Date: Wed, 9 Jul 2014 00:35:20 -0700 Subject: [PATCH 16/17] very minor comment fix --- .../org/apache/spark/mllib/linalg/distributed/RowMatrix.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index e003bf17178e5..ce0505aefad5d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -35,7 +35,7 @@ import org.apache.spark.mllib.stat.MultivariateStatisticalSummary * [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]] * together with add() and merge() function. * A numerically stable algorithm is implemented to compute sample mean and variance: - *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]. + * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]. * Zero elements (including explicit zero values) are skipped when calling add() and merge(), * to have time complexity O(nnz) instead of O(n) for each column. */ @@ -204,7 +204,7 @@ class RowMatrix( /** * Multiplies the Gramian matrix `A^T A` by a dense vector on the right without computing `A^T A`. * - @param v a dense vector whose length must match the number of columns of this matrix + * @param v a dense vector whose length must match the number of columns of this matrix * @return a dense vector representing the product */ private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = { From 6fb01a31ad967b849f5b738f22a64f8616d3177b Mon Sep 17 00:00:00 2001 From: Li Pu Date: Fri, 11 Jul 2014 16:12:43 -0700 Subject: [PATCH 17/17] use specialized axpy in RowMatrix --- .../apache/spark/mllib/linalg/distributed/RowMatrix.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 99cb6516e065c..93eff5f62a6e9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -214,7 +214,13 @@ class RowMatrix( seqOp = (U, r) => { val rBrz = r.toBreeze val a = rBrz.dot(vbr.value) - brzAxpy(a, rBrz, U.asInstanceOf[BV[Double]]) + rBrz match { + // use specialized axpy for better performance + case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U) + case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U) + case _ => throw new UnsupportedOperationException( + s"Do not support vector operation from type ${rBrz.getClass.getName}.") + } U }, combOp = (U1, U2) => U1 += U2