From f7a316e75de75aa989c36d3f98ec389f6c17f0d4 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Mon, 24 Mar 2014 22:00:13 -0700 Subject: [PATCH 1/5] new file --- .../spark/mllib/linalg/MatrixAlgebra.scala | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala new file mode 100644 index 0000000000000..e6d34c5f5d647 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} + +/** + * Square a given matrix efficienty + */ +object MatrixSquare { +/** + * TODO: javadoc Square a given matrix efficienty + */ + def squareWithDIMSUM(matrix: RDD[Array[Double]], + colMags: Array[Double], double: gamma): + Array[Array[Double]] = { + val n = matrix.first.size + + if (k < 1 || k > n) { + throw new IllegalArgumentException( + "Request up to n singular values k=$k n=$n") + } + + // Compute A^T A + val fullata = matrix.mapPartitions { + iter => + val localATA = Array.ofDim[Double](n, n) + while (iter.hasNext) { + val row = iter.next() + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + localATA(i)(j) += row(i) * row(j) + j += 1 + } + i += 1 + } + } + Iterator(localATA) + }.fold(Array.ofDim[Double](n, n)) { + (a, b) => + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + a(i)(j) += b(i)(j) + j += 1 + } + i += 1 + } + a + } + fullata + } + + +/** + * TODO: javadoc Square a tall skinny A^TA given matrix efficienty + */ + def square(matrix: RDD[Array[Double]]) : Array[Array[Double]] = { + val n = matrix.first.size + + if (k < 1 || k > n) { + throw new IllegalArgumentException( + "Request up to n singular values k=$k n=$n") + } + + // Compute A^T A + val fullata = matrix.mapPartitions { + iter => + val localATA = Array.ofDim[Double](n, n) + while (iter.hasNext) { + val row = iter.next() + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + localATA(i)(j) += row(i) * row(j) + j += 1 + } + i += 1 + } + } + Iterator(localATA) + }.fold(Array.ofDim[Double](n, n)) { + (a, b) => + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + a(i)(j) += b(i)(j) + j += 1 + } + i += 1 + } + a + } + fullata + } +} + From 823e265fe45a64739bf0bb7869174c78c6ddcbc2 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 28 Mar 2014 15:12:56 -0700 Subject: [PATCH 2/5] test for dimsum --- .../spark/mllib/linalg/MatrixAlgebra.scala | 77 ++++++++++++------- .../org/apache/spark/mllib/linalg/SVD.scala | 54 ++++++------- .../apache/spark/mllib/linalg/SVDSuite.scala | 31 ++++++++ 3 files changed, 104 insertions(+), 58 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala index e6d34c5f5d647..a2ebf7052fc66 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala @@ -17,27 +17,42 @@ package org.apache.spark.mllib.linalg -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD -import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} +import org.jblas.DoubleMatrix /** - * Square a given matrix efficienty + * Efficient matrix operations. + * For example square a given matrix efficiently */ -object MatrixSquare { -/** - * TODO: javadoc Square a given matrix efficienty - */ - def squareWithDIMSUM(matrix: RDD[Array[Double]], - colMags: Array[Double], double: gamma): - Array[Array[Double]] = { +object MatrixAlgebra { + + def columnMagnitudes(matrix: RDD[Array[Double]]): + Array[Double] = { + val n = matrix.first.size + matrix.map { + x => + val a = new DoubleMatrix(x) + a.mul(a).data + }.fold(Array.ofDim[Double](n)) { + (a, b) => + val am = new DoubleMatrix(a) + val bm = new DoubleMatrix(b) + am.addi(bm) + a + } + } + + + /** + * TODO: javadoc Square a given matrix efficienty + */ + def squareWithDIMSUM(matrix: RDD[Array[Double]], colMags: Array[Double], gamma: Double): + Array[Array[Double]] = { val n = matrix.first.size - if (k < 1 || k > n) { - throw new IllegalArgumentException( - "Request up to n singular values k=$k n=$n") + if (gamma < 1) { + throw new IllegalArgumentException("Oversampling should be greater than 1: $gamma") } // Compute A^T A @@ -48,10 +63,14 @@ object MatrixSquare { val row = iter.next() var i = 0 while (i < n) { - var j = 0 - while (j < n) { - localATA(i)(j) += row(i) * row(j) - j += 1 + if(Math.random < gamma / colMags(i)) { + var j = i + 1 + while (j < n) { + val mult = row(i) * row(j) + localATA(i)(j) += mult + localATA(j)(i) += mult + j += 1 + } } i += 1 } @@ -70,21 +89,25 @@ object MatrixSquare { } a } + + // undo normalization + for(i <- 0 until n) for(j <- i until n) { + fullata(i)(j) = if (i == j) colMags(i) + else if (gamma / colMags(i) > 1) fullata(i)(j) + else fullata(i)(j) * colMags(i) / gamma + fullata(j)(i) = fullata(i)(j) + } + fullata } -/** - * TODO: javadoc Square a tall skinny A^TA given matrix efficienty - */ - def square(matrix: RDD[Array[Double]]) : Array[Array[Double]] = { + /** + * TODO: javadoc Square a tall skinny A^TA given matrix efficienty + */ + def square(matrix: RDD[Array[Double]]): Array[Array[Double]] = { val n = matrix.first.size - if (k < 1 || k > n) { - throw new IllegalArgumentException( - "Request up to n singular values k=$k n=$n") - } - // Compute A^T A val fullata = matrix.mapPartitions { iter => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 3e7cc648d1d37..030921adbc3d0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -29,11 +29,16 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} class SVD { private var k = 1 private var computeU = true - + // All singular values smaller than rCond * sigma(0) // are treated as zero, where sigma(0) is the largest singular value. private var rCond = 1e-9 + // Flag for using DIMSUM in A^TA computation + // See http://arxiv.org/abs/1304.1467 + private var useDIMS = false + private var gamma = 10.0 + /** * Set the number of top-k singular vectors to return */ @@ -60,6 +65,17 @@ class SVD { this } + /** + * Should DIMSUM be used for computing A^TA? + * See http://arxiv.org/abs/1304.1467 + */ + def useDIMSUM(gamma: Double): SVD = { + this.useDIMS = true + this.gamma = gamma + this + } + + /** * Compute SVD using the current set parameters */ @@ -183,36 +199,12 @@ class SVD { } // Compute A^T A - val fullata = matrix.mapPartitions { - iter => - val localATA = Array.ofDim[Double](n, n) - while (iter.hasNext) { - val row = iter.next() - var i = 0 - while (i < n) { - var j = 0 - while (j < n) { - localATA(i)(j) += row(i) * row(j) - j += 1 - } - i += 1 - } - } - Iterator(localATA) - }.fold(Array.ofDim[Double](n, n)) { - (a, b) => - var i = 0 - while (i < n) { - var j = 0 - while (j < n) { - a(i)(j) += b(i)(j) - j += 1 - } - i += 1 - } - a - } - + val fullata = if (useDIMS) + MatrixAlgebra.squareWithDIMSUM( + matrix, MatrixAlgebra.columnMagnitudes(matrix), gamma) + else + MatrixAlgebra.square(matrix) + // Construct jblas A^T A locally val ata = new DoubleMatrix(fullata) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index 20e2b0f84be06..60bdbba33ec87 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -191,4 +191,35 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0)) } + + test("square matrix with DIMSUM") { + val m = 10 + val n = 3 + val datarr = Array.tabulate(m,n){ (a, b) => + MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten + val data = sc.makeRDD(datarr, 3) + + val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) + + val decomposed = new SVD().setK(n).setComputeU(true).useDIMSUM(40.0).compute(a) + val u = LAUtils.denseToSparse(decomposed.U) + val v = decomposed.V + val s = decomposed.S + + val denseA = getDenseMatrix(LAUtils.denseToSparse(a)) + val svd = Singular.sparseSVD(denseA) + + val retu = getDenseMatrix(u) + val rets = DoubleMatrix.diag(new DoubleMatrix(s)) + val retv = new DoubleMatrix(v) + + + // check individual decomposition + assertMatrixApproximatelyEquals(retu, svd(0)) + assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1))) + assertMatrixApproximatelyEquals(retv, svd(2)) + + // check multiplication guarantee + assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) + } } From 0de41063a54578f583cc4956dfbcc836a8b66de0 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 28 Mar 2014 15:34:07 -0700 Subject: [PATCH 3/5] Documentation and formatting --- .../spark/mllib/linalg/MatrixAlgebra.scala | 46 +++++++++++-------- .../org/apache/spark/mllib/linalg/SVD.scala | 14 +++--- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala index a2ebf7052fc66..2a555c98f846e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala @@ -23,13 +23,16 @@ import org.jblas.DoubleMatrix /** * Efficient matrix operations. - * For example square a given matrix efficiently */ object MatrixAlgebra { - - def columnMagnitudes(matrix: RDD[Array[Double]]): + /** + * Given matrix, compute squares of column magnitudes + * @param matrix given row-by-row, each row an array + * @return an array of squared column magnitudes + */ + def columnMagnitudes(matrix: RDD[Array[Double]]): Array[Double] = { - val n = matrix.first.size + val n = matrix.first.size matrix.map { x => val a = new DoubleMatrix(x) @@ -43,9 +46,14 @@ object MatrixAlgebra { } } - /** - * TODO: javadoc Square a given matrix efficienty + * Given a matrix A, compute A^T A using the sampling procedure + * described in http://arxiv.org/abs/1304.1467 + * @param matrix given row-by-row, each row an array + * @param colMags Euclidean column magnitudes squared + * @param gamma The oversampling parameter, should be set to greater than 1, + * guideline is 2 log(n) + * @return Computed A^T A */ def squareWithDIMSUM(matrix: RDD[Array[Double]], colMags: Array[Double], gamma: Double): Array[Array[Double]] = { @@ -56,14 +64,14 @@ object MatrixAlgebra { } // Compute A^T A - val fullata = matrix.mapPartitions { + val fullATA = matrix.mapPartitions { iter => val localATA = Array.ofDim[Double](n, n) while (iter.hasNext) { val row = iter.next() var i = 0 while (i < n) { - if(Math.random < gamma / colMags(i)) { + if (Math.random < gamma / colMags(i)) { var j = i + 1 while (j < n) { val mult = row(i) * row(j) @@ -91,25 +99,26 @@ object MatrixAlgebra { } // undo normalization - for(i <- 0 until n) for(j <- i until n) { - fullata(i)(j) = if (i == j) colMags(i) - else if (gamma / colMags(i) > 1) fullata(i)(j) - else fullata(i)(j) * colMags(i) / gamma - fullata(j)(i) = fullata(i)(j) + for (i <- 0 until n) for (j <- i until n) { + fullATA(i)(j) = if (i == j) colMags(i) + else if (gamma / colMags(i) > 1) fullATA(i)(j) + else fullATA(i)(j) * colMags(i) / gamma + fullATA(j)(i) = fullATA(i)(j) } - fullata + fullATA } - /** - * TODO: javadoc Square a tall skinny A^TA given matrix efficienty + * Given a matrix A, compute A^T A + * @param matrix given row-by-row, each row an array + * @return Computed A^T A */ def square(matrix: RDD[Array[Double]]): Array[Array[Double]] = { val n = matrix.first.size // Compute A^T A - val fullata = matrix.mapPartitions { + val fullATA = matrix.mapPartitions { iter => val localATA = Array.ofDim[Double](n, n) while (iter.hasNext) { @@ -138,7 +147,6 @@ object MatrixAlgebra { } a } - fullata + fullATA } } - diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 030921adbc3d0..dc11f6d83f51a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -29,7 +29,7 @@ import org.jblas.{DoubleMatrix, Singular, MatrixFunctions} class SVD { private var k = 1 private var computeU = true - + // All singular values smaller than rCond * sigma(0) // are treated as zero, where sigma(0) is the largest singular value. private var rCond = 1e-9 @@ -66,7 +66,7 @@ class SVD { } /** - * Should DIMSUM be used for computing A^TA? + * Use DIMSUM be used for computing A^TA with gamma * See http://arxiv.org/abs/1304.1467 */ def useDIMSUM(gamma: Double): SVD = { @@ -200,11 +200,11 @@ class SVD { // Compute A^T A val fullata = if (useDIMS) - MatrixAlgebra.squareWithDIMSUM( - matrix, MatrixAlgebra.columnMagnitudes(matrix), gamma) - else - MatrixAlgebra.square(matrix) - + MatrixAlgebra.squareWithDIMSUM( + matrix, MatrixAlgebra.columnMagnitudes(matrix), gamma) + else + MatrixAlgebra.square(matrix) + // Construct jblas A^T A locally val ata = new DoubleMatrix(fullata) From bbb4d9cabfe6395905fe074a6201183b25aec490 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 28 Mar 2014 15:40:03 -0700 Subject: [PATCH 4/5] formatting --- .../src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index 60bdbba33ec87..286f871d90569 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -213,7 +213,6 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { val rets = DoubleMatrix.diag(new DoubleMatrix(s)) val retv = new DoubleMatrix(v) - // check individual decomposition assertMatrixApproximatelyEquals(retu, svd(0)) assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1))) From 599c8f20feae6be916203a00f74ade127efe6968 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sun, 6 Apr 2014 18:42:26 -0700 Subject: [PATCH 5/5] style changes --- .../src/main/scala/org/apache/spark/mllib/linalg/SVD.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index dc11f6d83f51a..d2636a53d30e7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -199,14 +199,15 @@ class SVD { } // Compute A^T A - val fullata = if (useDIMS) + val fullATA = if (useDIMS) { MatrixAlgebra.squareWithDIMSUM( matrix, MatrixAlgebra.columnMagnitudes(matrix), gamma) - else + } else { MatrixAlgebra.square(matrix) + } // Construct jblas A^T A locally - val ata = new DoubleMatrix(fullata) + val ata = new DoubleMatrix(fullATA) // Since A^T A is small, we can compute its SVD directly val svd = Singular.sparseSVD(ata)