diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala new file mode 100644 index 000000000000..2a555c98f846 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.rdd.RDD + +import org.jblas.DoubleMatrix + +/** + * Efficient matrix operations. + */ +object MatrixAlgebra { + /** + * Given matrix, compute squares of column magnitudes + * @param matrix given row-by-row, each row an array + * @return an array of squared column magnitudes + */ + def columnMagnitudes(matrix: RDD[Array[Double]]): + Array[Double] = { + val n = matrix.first.size + matrix.map { + x => + val a = new DoubleMatrix(x) + a.mul(a).data + }.fold(Array.ofDim[Double](n)) { + (a, b) => + val am = new DoubleMatrix(a) + val bm = new DoubleMatrix(b) + am.addi(bm) + a + } + } + + /** + * Given a matrix A, compute A^T A using the sampling procedure + * described in http://arxiv.org/abs/1304.1467 + * @param matrix given row-by-row, each row an array + * @param colMags Euclidean column magnitudes squared + * @param gamma The oversampling parameter, should be set to greater than 1, + * guideline is 2 log(n) + * @return Computed A^T A + */ + def squareWithDIMSUM(matrix: RDD[Array[Double]], colMags: Array[Double], gamma: Double): + Array[Array[Double]] = { + val n = matrix.first.size + + if (gamma < 1) { + throw new IllegalArgumentException("Oversampling should be greater than 1: $gamma") + } + + // Compute A^T A + val fullATA = matrix.mapPartitions { + iter => + val localATA = Array.ofDim[Double](n, n) + while (iter.hasNext) { + val row = iter.next() + var i = 0 + while (i < n) { + if (Math.random < gamma / colMags(i)) { + var j = i + 1 + while (j < n) { + val mult = row(i) * row(j) + localATA(i)(j) += mult + localATA(j)(i) += mult + j += 1 + } + } + i += 1 + } + } + Iterator(localATA) + }.fold(Array.ofDim[Double](n, n)) { + (a, b) => + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + a(i)(j) += b(i)(j) + j += 1 + } + i += 1 + } + a + } + + // undo normalization + for (i <- 0 until n) for (j <- i until n) { + fullATA(i)(j) = if (i == j) colMags(i) + else if (gamma / colMags(i) > 1) fullATA(i)(j) + else fullATA(i)(j) * colMags(i) / gamma + fullATA(j)(i) = fullATA(i)(j) + } + + fullATA + } + + /** + * Given a matrix A, compute A^T A + * @param matrix given row-by-row, each row an array + * @return Computed A^T A + */ + def square(matrix: RDD[Array[Double]]): Array[Array[Double]] = { + val n = matrix.first.size + + // Compute A^T A + val fullATA = matrix.mapPartitions { + iter => + val localATA = Array.ofDim[Double](n, n) + while (iter.hasNext) { + val row = iter.next() + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + localATA(i)(j) += row(i) * row(j) + j += 1 + } + i += 1 + } + } + Iterator(localATA) + }.fold(Array.ofDim[Double](n, n)) { + (a, b) => + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + a(i)(j) += b(i)(j) + j += 1 + } + i += 1 + } + a + } + fullATA + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 3e7cc648d1d3..d2636a53d30e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -34,6 +34,11 @@ class SVD { // are treated as zero, where sigma(0) is the largest singular value. private var rCond = 1e-9 + // Flag for using DIMSUM in A^TA computation + // See http://arxiv.org/abs/1304.1467 + private var useDIMS = false + private var gamma = 10.0 + /** * Set the number of top-k singular vectors to return */ @@ -60,6 +65,17 @@ class SVD { this } + /** + * Use DIMSUM be used for computing A^TA with gamma + * See http://arxiv.org/abs/1304.1467 + */ + def useDIMSUM(gamma: Double): SVD = { + this.useDIMS = true + this.gamma = gamma + this + } + + /** * Compute SVD using the current set parameters */ @@ -183,38 +199,15 @@ class SVD { } // Compute A^T A - val fullata = matrix.mapPartitions { - iter => - val localATA = Array.ofDim[Double](n, n) - while (iter.hasNext) { - val row = iter.next() - var i = 0 - while (i < n) { - var j = 0 - while (j < n) { - localATA(i)(j) += row(i) * row(j) - j += 1 - } - i += 1 - } - } - Iterator(localATA) - }.fold(Array.ofDim[Double](n, n)) { - (a, b) => - var i = 0 - while (i < n) { - var j = 0 - while (j < n) { - a(i)(j) += b(i)(j) - j += 1 - } - i += 1 - } - a + val fullATA = if (useDIMS) { + MatrixAlgebra.squareWithDIMSUM( + matrix, MatrixAlgebra.columnMagnitudes(matrix), gamma) + } else { + MatrixAlgebra.square(matrix) } // Construct jblas A^T A locally - val ata = new DoubleMatrix(fullata) + val ata = new DoubleMatrix(fullATA) // Since A^T A is small, we can compute its SVD directly val svd = Singular.sparseSVD(ata) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala index 20e2b0f84be0..286f871d9056 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala @@ -191,4 +191,34 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll { assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1).getRow(0))) assertMatrixApproximatelyEquals(retv, svd(2).getColumn(0)) } + + test("square matrix with DIMSUM") { + val m = 10 + val n = 3 + val datarr = Array.tabulate(m,n){ (a, b) => + MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten + val data = sc.makeRDD(datarr, 3) + + val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) + + val decomposed = new SVD().setK(n).setComputeU(true).useDIMSUM(40.0).compute(a) + val u = LAUtils.denseToSparse(decomposed.U) + val v = decomposed.V + val s = decomposed.S + + val denseA = getDenseMatrix(LAUtils.denseToSparse(a)) + val svd = Singular.sparseSVD(denseA) + + val retu = getDenseMatrix(u) + val rets = DoubleMatrix.diag(new DoubleMatrix(s)) + val retv = new DoubleMatrix(v) + + // check individual decomposition + assertMatrixApproximatelyEquals(retu, svd(0)) + assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1))) + assertMatrixApproximatelyEquals(retv, svd(2)) + + // check multiplication guarantee + assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) + } }