From 5b8cd7deb3f29d3c2533b01f496f41175471f023 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sun, 3 Aug 2014 19:19:45 -0700 Subject: [PATCH 01/27] Initial files --- .../spark/mllib/linalg/MatrixAlgebra.scala | 151 ++++++++++++++++++ .../spark/mllib/linalg/MatricesSuite.scala | 30 ++++ 2 files changed, 181 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala new file mode 100644 index 0000000000000..df168846bf0eb --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.rdd.RDD +import org.jblas.DoubleMatrix + +/** + * Efficient matrix operations. + */ +object MatrixAlgebra { + /** + * Given matrix, compute squares of column magnitudes + * @param matrix given row-by-row, each row an array + * @return an array of squared column magnitudes + */ + def columnMagnitudes(matrix: RDD[Array[Double]]): + Array[Double] = { + val n = matrix.first().size() + matrix.map { + x => + val a = new DoubleMatrix(x) + a.mul(a).data + }.fold(Array.ofDim[Double](n)) { + (a, b) => + val am = new DoubleMatrix(a) + val bm = new DoubleMatrix(b) + am.addi(bm) + a + } + } + + /** + * Given a matrix A, compute A^T A using the sampling procedure + * described in http://arxiv.org/abs/1304.1467 + * @param matrix given row-by-row, each row an array + * @param colMags Euclidean column magnitudes squared + * @param gamma The oversampling parameter, should be set to greater than 1, + * guideline is 2 log(n) + * @return Computed A^T A + */ + def squareWithDIMSUM(matrix: RDD[Array[Double]], colMags: Array[Double], gamma: Double): + Array[Array[Double]] = { + val n = matrix.first.size + + if (gamma <= 1.0) { + throw new IllegalArgumentException("Oversampling should be greater than 1: $gamma") + } + + // Compute A^T A + val fullATA = matrix.mapPartitions { + iter => + val localATA = Array.ofDim[Double](n, n) + while (iter.hasNext) { + val row = iter.next() + var i = 0 + while (i < n) { + if (Math.random < gamma / colMags(i)) { + var j = i + 1 + while (j < n) { + val mult = row(i) * row(j) + localATA(i)(j) += mult + localATA(j)(i) += mult + j += 1 + } + } + i += 1 + } + } + Iterator(localATA) + }.fold(Array.ofDim[Double](n, n)) { + (a, b) => + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + a(i)(j) += b(i)(j) + j += 1 + } + i += 1 + } + a + } + + // undo normalization + for (i <- 0 until n) for (j <- i until n) { + fullATA(i)(j) = if (i == j) colMags(i) + else if (gamma / colMags(i) > 1) fullATA(i)(j) + else fullATA(i)(j) * colMags(i) / gamma + fullATA(j)(i) = fullATA(i)(j) + } + + fullATA + } + + /** + * Given a matrix A, compute A^T A + * @param matrix given row-by-row, each row an array + * @return Computed A^T A + */ + def square(matrix: RDD[Array[Double]]): Array[Array[Double]] = { + val n = matrix.first.size + + // Compute A^T A + val fullATA = matrix.mapPartitions { + iter => + val localATA = Array.ofDim[Double](n, n) + while (iter.hasNext) { + val row = iter.next() + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + localATA(i)(j) += row(i) * row(j) + j += 1 + } + i += 1 + } + } + Iterator(localATA) + }.fold(Array.ofDim[Double](n, n)) { + (a, b) => + var i = 0 + while (i < n) { + var j = 0 + while (j < n) { + a(i)(j) += b(i)(j) + j += 1 + } + i += 1 + } + a + } + fullATA + } +} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index 9c66b4db9f16b..4bfe21759250b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -36,4 +36,34 @@ class MatricesSuite extends FunSuite { Matrices.dense(3, 2, Array(0.0, 1.0, 2.0)) } } + + test("square matrix with DIMSUM") { + val m = 10 + val n = 3 + val datarr = Array.tabulate(m,n){ (a, b) => + MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten + val data = sc.makeRDD(datarr, 3) + + val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) + + val decomposed = new SVD().setK(n).setComputeU(true).useDIMSUM(40.0).compute(a) + val u = LAUtils.denseToSparse(decomposed.U) + val v = decomposed.V + val s = decomposed.S + + val denseA = getDenseMatrix(LAUtils.denseToSparse(a)) + val svd = Singular.sparseSVD(denseA) + + val retu = getDenseMatrix(u) + val rets = DoubleMatrix.diag(new DoubleMatrix(s)) + val retv = new DoubleMatrix(v) + + // check individual decomposition + assertMatrixApproximatelyEquals(retu, svd(0)) + assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1))) + assertMatrixApproximatelyEquals(retv, svd(2)) + + // check multiplication guarantee + assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) + } } From 6bebabb9364eb917dd86acbea4438a9e4d301f18 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Mon, 4 Aug 2014 11:37:31 -0700 Subject: [PATCH 02/27] remove changes to MatrixSuite --- .../spark/mllib/linalg/MatrixAlgebra.scala | 3 +- .../spark/mllib/linalg/MatricesSuite.scala | 30 ------------------- 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala index df168846bf0eb..3f6dfcfd3e64a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala @@ -51,7 +51,8 @@ object MatrixAlgebra { * @param matrix given row-by-row, each row an array * @param colMags Euclidean column magnitudes squared * @param gamma The oversampling parameter, should be set to greater than 1, - * guideline is 2 log(n) + * guideline is 2 log(n) / s where s is the smallest similarity + * to be estimated * @return Computed A^T A */ def squareWithDIMSUM(matrix: RDD[Array[Double]], colMags: Array[Double], gamma: Double): diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index 4bfe21759250b..9c66b4db9f16b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -36,34 +36,4 @@ class MatricesSuite extends FunSuite { Matrices.dense(3, 2, Array(0.0, 1.0, 2.0)) } } - - test("square matrix with DIMSUM") { - val m = 10 - val n = 3 - val datarr = Array.tabulate(m,n){ (a, b) => - MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten - val data = sc.makeRDD(datarr, 3) - - val a = LAUtils.sparseToTallSkinnyDense(SparseMatrix(data, m, n)) - - val decomposed = new SVD().setK(n).setComputeU(true).useDIMSUM(40.0).compute(a) - val u = LAUtils.denseToSparse(decomposed.U) - val v = decomposed.V - val s = decomposed.S - - val denseA = getDenseMatrix(LAUtils.denseToSparse(a)) - val svd = Singular.sparseSVD(denseA) - - val retu = getDenseMatrix(u) - val rets = DoubleMatrix.diag(new DoubleMatrix(s)) - val retv = new DoubleMatrix(v) - - // check individual decomposition - assertMatrixApproximatelyEquals(retu, svd(0)) - assertMatrixApproximatelyEquals(rets, DoubleMatrix.diag(svd(1))) - assertMatrixApproximatelyEquals(retv, svd(2)) - - // check multiplication guarantee - assertMatrixApproximatelyEquals(retu.mmul(rets).mmul(retv.transpose), denseA) - } } From 3726ca97ab184a8d5a9b3c0003d3afa6fd973890 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Mon, 4 Aug 2014 13:47:57 -0700 Subject: [PATCH 03/27] Remove MatrixAlgebra --- .../spark/mllib/linalg/MatrixAlgebra.scala | 152 ------------------ .../mllib/linalg/distributed/RowMatrix.scala | 33 ++++ 2 files changed, 33 insertions(+), 152 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala deleted file mode 100644 index 3f6dfcfd3e64a..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixAlgebra.scala +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg - -import org.apache.spark.rdd.RDD -import org.jblas.DoubleMatrix - -/** - * Efficient matrix operations. - */ -object MatrixAlgebra { - /** - * Given matrix, compute squares of column magnitudes - * @param matrix given row-by-row, each row an array - * @return an array of squared column magnitudes - */ - def columnMagnitudes(matrix: RDD[Array[Double]]): - Array[Double] = { - val n = matrix.first().size() - matrix.map { - x => - val a = new DoubleMatrix(x) - a.mul(a).data - }.fold(Array.ofDim[Double](n)) { - (a, b) => - val am = new DoubleMatrix(a) - val bm = new DoubleMatrix(b) - am.addi(bm) - a - } - } - - /** - * Given a matrix A, compute A^T A using the sampling procedure - * described in http://arxiv.org/abs/1304.1467 - * @param matrix given row-by-row, each row an array - * @param colMags Euclidean column magnitudes squared - * @param gamma The oversampling parameter, should be set to greater than 1, - * guideline is 2 log(n) / s where s is the smallest similarity - * to be estimated - * @return Computed A^T A - */ - def squareWithDIMSUM(matrix: RDD[Array[Double]], colMags: Array[Double], gamma: Double): - Array[Array[Double]] = { - val n = matrix.first.size - - if (gamma <= 1.0) { - throw new IllegalArgumentException("Oversampling should be greater than 1: $gamma") - } - - // Compute A^T A - val fullATA = matrix.mapPartitions { - iter => - val localATA = Array.ofDim[Double](n, n) - while (iter.hasNext) { - val row = iter.next() - var i = 0 - while (i < n) { - if (Math.random < gamma / colMags(i)) { - var j = i + 1 - while (j < n) { - val mult = row(i) * row(j) - localATA(i)(j) += mult - localATA(j)(i) += mult - j += 1 - } - } - i += 1 - } - } - Iterator(localATA) - }.fold(Array.ofDim[Double](n, n)) { - (a, b) => - var i = 0 - while (i < n) { - var j = 0 - while (j < n) { - a(i)(j) += b(i)(j) - j += 1 - } - i += 1 - } - a - } - - // undo normalization - for (i <- 0 until n) for (j <- i until n) { - fullATA(i)(j) = if (i == j) colMags(i) - else if (gamma / colMags(i) > 1) fullATA(i)(j) - else fullATA(i)(j) * colMags(i) / gamma - fullATA(j)(i) = fullATA(i)(j) - } - - fullATA - } - - /** - * Given a matrix A, compute A^T A - * @param matrix given row-by-row, each row an array - * @return Computed A^T A - */ - def square(matrix: RDD[Array[Double]]): Array[Array[Double]] = { - val n = matrix.first.size - - // Compute A^T A - val fullATA = matrix.mapPartitions { - iter => - val localATA = Array.ofDim[Double](n, n) - while (iter.hasNext) { - val row = iter.next() - var i = 0 - while (i < n) { - var j = 0 - while (j < n) { - localATA(i)(j) += row(i) * row(j) - j += 1 - } - i += 1 - } - } - Iterator(localATA) - }.fold(Array.ofDim[Double](n, n)) { - (a, b) => - var i = 0 - while (i < n) { - var j = 0 - while (j < n) { - a(i)(j) += b(i)(j) - j += 1 - } - i += 1 - } - a - } - fullATA - } -} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 45486b2c7d82d..a2b04e195815e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -31,6 +31,8 @@ import org.apache.spark.Logging import org.apache.spark.mllib.rdd.RDDFunctions._ import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary} +import scala.collection.mutable.ListBuffer + /** * :: Experimental :: * Represents a row-oriented distributed Matrix with no meaningful row indices. @@ -390,6 +392,37 @@ class RowMatrix( new RowMatrix(AB, nRows, B.numCols) } + + def similarColumnsDIMSUM(colMags: Vector, gamma: Double): + CoordinateMatrix = { + require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma") + + require(colMags.size == this.numCols(), + s"Number of magnitudes ${colMags.size} didn't match column dimension ${numCols()}") + + val sg = math.sqrt(gamma) + + val sims = rows.flatMap { + row => + val buf = new ListBuffer[((Long, Long), Double)]() + row.toBreeze.activeIterator.foreach { + case (_, 0.0) => // Skip explicit zero elements. + case (i, iVal) => + if (Math.random < sg / colMags(i)) { + row.toBreeze.activeIterator.foreach { + case (_, 0.0) => // Skip explicit zero elements. + case (j, jVal) => + if (Math.random < sg / colMags(j)) { + buf += ((i, j), (iVal * jVal) / (math.min(sg, colMags(i)) * math.min(sg, colMags(j)))) + } + } + } + } + buf + }.reduce(_ + _).map{ case ((i, j), sim) => MatrixEntry(i, j, sim) } + CoordinateMatrix(sims, numCols(), numCols()) + } + private[mllib] override def toBreeze(): BDM[Double] = { val m = numRows().toInt val n = numCols().toInt From 654c4fb1136cfa856fc354b5ddb710758d38948f Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Mon, 4 Aug 2014 14:38:18 -0700 Subject: [PATCH 04/27] default methods --- .../mllib/linalg/distributed/RowMatrix.scala | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index a2b04e195815e..be92cbd33eba8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -27,6 +27,7 @@ import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg._ import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ import org.apache.spark.Logging import org.apache.spark.mllib.rdd.RDDFunctions._ import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary} @@ -392,13 +393,26 @@ class RowMatrix( new RowMatrix(AB, nRows, B.numCols) } + def similarColumns(): + CoordinateMatrix = { + similarColumnsDIMSUM(Double.MaxValue) + } + + def similarColumnsDIMSUM(gamma: Double): + CoordinateMatrix = { + val stats = computeColumnSummaryStatistics() + val variance = stats.variance.toArray + val meanSquared = stats.mean.toArray.map(x => x * x) + val colMags = variance.zip(meanSquared).map{case (a:Double, b:Double) => math.sqrt(a + b)} + similarColumnsDIMSUM(colMags, gamma) + } - def similarColumnsDIMSUM(colMags: Vector, gamma: Double): + def similarColumnsDIMSUM(colMags: Array[Double], gamma: Double): CoordinateMatrix = { require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma") require(colMags.size == this.numCols(), - s"Number of magnitudes ${colMags.size} didn't match column dimension ${numCols()}") + "Number of magnitudes didn't match column dimension") val sg = math.sqrt(gamma) @@ -413,14 +427,15 @@ class RowMatrix( case (_, 0.0) => // Skip explicit zero elements. case (j, jVal) => if (Math.random < sg / colMags(j)) { - buf += ((i, j), (iVal * jVal) / (math.min(sg, colMags(i)) * math.min(sg, colMags(j)))) + val contrib = ((i.toLong, j.toLong), (iVal * jVal) / (math.min(sg, colMags(i)) * math.min(sg, colMags(j)))) + buf += contrib } } } } buf - }.reduce(_ + _).map{ case ((i, j), sim) => MatrixEntry(i, j, sim) } - CoordinateMatrix(sims, numCols(), numCols()) + }.reduceByKey(_ + _).map { case ((i, j), sim) => MatrixEntry(i, j, sim)} + new CoordinateMatrix(sims, numCols(), numCols()) } private[mllib] override def toBreeze(): BDM[Double] = { From 502ce526fc8ec84fd2c1f3b2b9a74b07e76c2d65 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Mon, 4 Aug 2014 15:02:36 -0700 Subject: [PATCH 05/27] new interface --- .../apache/spark/mllib/linalg/distributed/RowMatrix.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index be92cbd33eba8..8c0f1eedfabd4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -414,7 +414,7 @@ class RowMatrix( require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension") - val sg = math.sqrt(gamma) + val sg = math.sqrt(gamma) // sqrt(gamma) used many times val sims = rows.flatMap { row => @@ -427,7 +427,8 @@ class RowMatrix( case (_, 0.0) => // Skip explicit zero elements. case (j, jVal) => if (Math.random < sg / colMags(j)) { - val contrib = ((i.toLong, j.toLong), (iVal * jVal) / (math.min(sg, colMags(i)) * math.min(sg, colMags(j)))) + val contrib = ((i.toLong, j.toLong), (iVal * jVal) / + (math.min(sg, colMags(i)) * math.min(sg, colMags(j)))) buf += contrib } } From 05e59b8e883fd126dc81707b90aaf1011a2d1ee5 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Mon, 4 Aug 2014 15:59:55 -0700 Subject: [PATCH 06/27] Add test --- .../mllib/linalg/distributed/RowMatrix.scala | 6 ++++++ .../linalg/distributed/RowMatrixSuite.scala | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 8c0f1eedfabd4..97783261bf31c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -398,6 +398,12 @@ class RowMatrix( similarColumnsDIMSUM(Double.MaxValue) } + def similarColumns(threshold: Double): + CoordinateMatrix = { + require(threshold > 0, s"Similarity threshold must be above 0, but set to: $threshold") + similarColumnsDIMSUM(2.0 * math.log(numCols()) / threshold) + } + def similarColumnsDIMSUM(gamma: Double): CoordinateMatrix = { val stats = computeColumnSummaryStatistics() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 325b817980f68..9c5a8c21ec11f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -95,6 +95,24 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { } } + test("similar columns") { + val means = Vectors.dense(4.5, 3.0, 4.0) + val variances = Vectors.dense(15.0, 10.0, 10.0) + val expected = + Matrices.dense(n, n, Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0)).toBreeze + + for(i <- 0 until n) for(j <- 0 until n) { + val ci = means(i) * means(i) + variances(i) + val cj = means(i) * means(i) + variances(i) + expected(i, j) /= (ci * cj) + } + + for (mat <- Seq(denseMat, sparseMat)) { + val G = mat.similarColumns() + assert(G.toBreeze === expected.toBreeze) + } + } + test("svd of a full-rank matrix") { for (mat <- Seq(denseMat, sparseMat)) { for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) { From 75edb257e33a23f87fa379be597483d12a421626 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Mon, 4 Aug 2014 18:02:33 -0700 Subject: [PATCH 07/27] All tests passing! --- .../mllib/linalg/distributed/RowMatrix.scala | 7 ++--- .../stat/MultivariateOnlineSummarizer.scala | 23 ++++++++++++++++ .../stat/MultivariateStatisticalSummary.scala | 5 ++++ .../linalg/distributed/RowMatrixSuite.scala | 26 +++++++++++++------ 4 files changed, 48 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 97783261bf31c..e3fe060da5f97 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -401,15 +401,12 @@ class RowMatrix( def similarColumns(threshold: Double): CoordinateMatrix = { require(threshold > 0, s"Similarity threshold must be above 0, but set to: $threshold") - similarColumnsDIMSUM(2.0 * math.log(numCols()) / threshold) + similarColumnsDIMSUM(10.0 * math.log(numCols()) / threshold) } def similarColumnsDIMSUM(gamma: Double): CoordinateMatrix = { - val stats = computeColumnSummaryStatistics() - val variance = stats.variance.toArray - val meanSquared = stats.mean.toArray.map(x => x * x) - val colMags = variance.zip(meanSquared).map{case (a:Double, b:Double) => math.sqrt(a + b)} + val colMags = computeColumnSummaryStatistics().magnitude.toArray similarColumnsDIMSUM(colMags, gamma) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 5105b5c37aaaa..cb02ba3cfbaa4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -42,6 +42,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S private var n = 0 private var currMean: BDV[Double] = _ private var currM2n: BDV[Double] = _ + private var currM2: BDV[Double] = _ private var totalCnt: Long = 0 private var nnz: BDV[Double] = _ private var currMax: BDV[Double] = _ @@ -60,6 +61,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S currMean = BDV.zeros[Double](n) currM2n = BDV.zeros[Double](n) + currM2 = BDV.zeros[Double](n) nnz = BDV.zeros[Double](n) currMax = BDV.fill(n)(Double.MinValue) currMin = BDV.fill(n)(Double.MaxValue) @@ -81,6 +83,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S val tmpPrevMean = currMean(i) currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0) currM2n(i) += (value - currMean(i)) * (value - tmpPrevMean) + currM2(i) += value * value nnz(i) += 1.0 } @@ -114,6 +117,11 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / (nnz(i) + other.nnz(i)) } + // merge m2 together + if (nnz(i) + other.nnz(i) != 0.0) { + currM2(i) += other.currM2(i) + } + if (currMax(i) < other.currMax(i)) { currMax(i) = other.currMax(i) } @@ -127,6 +135,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S this.n = other.n this.currMean = other.currMean.copy this.currM2n = other.currM2n.copy + this.currM2 = other.currM2.copy this.totalCnt = other.totalCnt this.nnz = other.nnz.copy this.currMax = other.currMax.copy @@ -198,4 +207,18 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } Vectors.fromBreeze(currMin) } + + override def magnitude: Vector = { + require(totalCnt > 0, s"Nothing has been added to this summarizer.") + + val realMagnitude = BDV.zeros[Double](n) + + var i = 0 + while (i < currM2.size) { + realMagnitude(i) = math.sqrt(currM2(i)) + i += 1 + } + + Vectors.fromBreeze(realMagnitude) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala index f9eb343da2b82..55ce943aab29d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala @@ -53,4 +53,9 @@ trait MultivariateStatisticalSummary { * Minimum value of each column. */ def min: Vector + + /** + * Euclidean magnitude of each column + */ + def magnitude: Vector } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 9c5a8c21ec11f..305e169843c31 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -96,20 +96,29 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { } test("similar columns") { - val means = Vectors.dense(4.5, 3.0, 4.0) - val variances = Vectors.dense(15.0, 10.0, 10.0) - val expected = - Matrices.dense(n, n, Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0)).toBreeze + val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)) + val expected = BDM( + (126.0, 54.0, 72.0), + (54.0, 66.0, 78.0), + (72.0, 78.0, 94.0)) for(i <- 0 until n) for(j <- 0 until n) { - val ci = means(i) * means(i) + variances(i) - val cj = means(i) * means(i) + variances(i) - expected(i, j) /= (ci * cj) + expected(i, j) /= (colMags(i) * colMags(j)) + } + + for (mat <- Seq(denseMat, sparseMat)) { + val G = mat.similarColumnsDIMSUM(150.0) + assert(closeToZero(G.toBreeze() - expected)) } for (mat <- Seq(denseMat, sparseMat)) { val G = mat.similarColumns() - assert(G.toBreeze === expected.toBreeze) + assert(closeToZero(G.toBreeze() - expected)) + } + + for (mat <- Seq(denseMat, sparseMat)) { + val G = mat.similarColumns(0.01) + assert(closeToZero(G.toBreeze() - expected)) } } @@ -208,6 +217,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { assert(summary.numNonzeros === Vectors.dense(3.0, 3.0, 4.0), "nnz mismatch") assert(summary.max === Vectors.dense(9.0, 7.0, 8.0), "max mismatch") assert(summary.min === Vectors.dense(0.0, 0.0, 1.0), "column mismatch.") + assert(summary.magnitude === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)), "magnitude mismatch.") } } } From 029aa9c3d71960cb63293d721b96eebb6bdfcfbf Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Mon, 4 Aug 2014 22:12:40 -0700 Subject: [PATCH 08/27] javadoc and new test --- .../mllib/linalg/distributed/RowMatrix.scala | 28 +++++++++++++++---- .../linalg/distributed/RowMatrixSuite.scala | 2 +- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index e3fe060da5f97..4a75da8f32154 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -393,23 +393,39 @@ class RowMatrix( new RowMatrix(AB, nRows, B.numCols) } + /** + * Find all similar columns using cosine similarity. + * + * @return An n x n sparse matrix of cosine similarities between columns of this matrix. + */ def similarColumns(): CoordinateMatrix = { similarColumnsDIMSUM(Double.MaxValue) } - def similarColumns(threshold: Double): - CoordinateMatrix = { - require(threshold > 0, s"Similarity threshold must be above 0, but set to: $threshold") - similarColumnsDIMSUM(10.0 * math.log(numCols()) / threshold) - } - + /** + * Find all similar columns using the DIMSUM sampling algorithm, described in + * http://arxiv.org/abs/1304.1467 + * + * @param gamma The oversampling parameter. For provable results, set to 4 * log(n) / s, where s is the smallest + * similarity score to be estimated, and n is the number of columns + * @return An n x n sparse matrix of cosine similarities between columns of this matrix. + */ def similarColumnsDIMSUM(gamma: Double): CoordinateMatrix = { val colMags = computeColumnSummaryStatistics().magnitude.toArray similarColumnsDIMSUM(colMags, gamma) } + /** + * Find all similar columns using the DIMSUM sampling algorithm, described in + * http://arxiv.org/abs/1304.1467 + * + * @param colMags A vector of column magnitudes + * @param gamma The oversampling parameter. For provable results, set to 4 * log(n) / s, where s is the smallest + * similarity score to be estimated, and n is the number of columns + * @return An n x n sparse matrix of cosine similarities between columns of this matrix. + */ def similarColumnsDIMSUM(colMags: Array[Double], gamma: Double): CoordinateMatrix = { require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma") diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 305e169843c31..02c57245f9739 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -117,7 +117,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { } for (mat <- Seq(denseMat, sparseMat)) { - val G = mat.similarColumns(0.01) + val G = mat.similarColumnsDIMSUM(colMags.toArray, 150.0) assert(closeToZero(G.toBreeze() - expected)) } } From 139c8e1d20274322dfe1c513d6872e47f5eb5138 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Mon, 4 Aug 2014 22:16:23 -0700 Subject: [PATCH 09/27] Syntax changes --- .../apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 02c57245f9739..9fdbae04d192d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -217,7 +217,8 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { assert(summary.numNonzeros === Vectors.dense(3.0, 3.0, 4.0), "nnz mismatch") assert(summary.max === Vectors.dense(9.0, 7.0, 8.0), "max mismatch") assert(summary.min === Vectors.dense(0.0, 0.0, 1.0), "column mismatch.") - assert(summary.magnitude === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)), "magnitude mismatch.") + assert(summary.magnitude === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)), + "magnitude mismatch.") } } } From 41e8ece322581c8584d7293bc36c1179be228f54 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Mon, 4 Aug 2014 22:53:19 -0700 Subject: [PATCH 10/27] style changes --- .../spark/mllib/linalg/distributed/RowMatrix.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 4a75da8f32154..2964f4852f144 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -407,8 +407,9 @@ class RowMatrix( * Find all similar columns using the DIMSUM sampling algorithm, described in * http://arxiv.org/abs/1304.1467 * - * @param gamma The oversampling parameter. For provable results, set to 4 * log(n) / s, where s is the smallest - * similarity score to be estimated, and n is the number of columns + * @param gamma The oversampling parameter. For provable results, set to 4 * log(n) / s, + * where s is the smallest similarity score to be estimated, + * and n is the number of columns * @return An n x n sparse matrix of cosine similarities between columns of this matrix. */ def similarColumnsDIMSUM(gamma: Double): @@ -422,8 +423,9 @@ class RowMatrix( * http://arxiv.org/abs/1304.1467 * * @param colMags A vector of column magnitudes - * @param gamma The oversampling parameter. For provable results, set to 4 * log(n) / s, where s is the smallest - * similarity score to be estimated, and n is the number of columns + * @param gamma The oversampling parameter. For provable results, set to 4 * log(n) / s, + * where s is the smallest similarity score to be estimated, + * and n is the number of columns * @return An n x n sparse matrix of cosine similarities between columns of this matrix. */ def similarColumnsDIMSUM(colMags: Array[Double], gamma: Double): From dbc55bafa112bee3ae5127fcce59200ca654d068 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 6 Aug 2014 14:50:47 -0700 Subject: [PATCH 11/27] Make colMagnitudes a method in RowMatrix --- .../mllib/linalg/distributed/RowMatrix.scala | 16 ++++++++++++-- .../stat/MultivariateOnlineSummarizer.scala | 21 ------------------- .../stat/MultivariateStatisticalSummary.scala | 5 ----- .../linalg/distributed/RowMatrixSuite.scala | 2 -- 4 files changed, 14 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 2964f4852f144..53bf3f83b586b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -393,6 +393,19 @@ class RowMatrix( new RowMatrix(AB, nRows, B.numCols) } + /** + * Return 2-norm of the columns of this matrix. + * @return an array of column magnitudes + */ + def columnMagnitudes(): + Array[Double] = { + rows.map { + x => + val brzX = x.toBreeze + brzX.:*(brzX) + }.fold(BDV.zeros[Double](numCols().toInt))(_ + _).toArray.map(math.sqrt(_)) + } + /** * Find all similar columns using cosine similarity. * @@ -414,8 +427,7 @@ class RowMatrix( */ def similarColumnsDIMSUM(gamma: Double): CoordinateMatrix = { - val colMags = computeColumnSummaryStatistics().magnitude.toArray - similarColumnsDIMSUM(colMags, gamma) + similarColumnsDIMSUM(columnMagnitudes(), gamma) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index cb02ba3cfbaa4..83d49d9818c2c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -42,7 +42,6 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S private var n = 0 private var currMean: BDV[Double] = _ private var currM2n: BDV[Double] = _ - private var currM2: BDV[Double] = _ private var totalCnt: Long = 0 private var nnz: BDV[Double] = _ private var currMax: BDV[Double] = _ @@ -83,7 +82,6 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S val tmpPrevMean = currMean(i) currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0) currM2n(i) += (value - currMean(i)) * (value - tmpPrevMean) - currM2(i) += value * value nnz(i) += 1.0 } @@ -117,10 +115,6 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / (nnz(i) + other.nnz(i)) } - // merge m2 together - if (nnz(i) + other.nnz(i) != 0.0) { - currM2(i) += other.currM2(i) - } if (currMax(i) < other.currMax(i)) { currMax(i) = other.currMax(i) @@ -135,7 +129,6 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S this.n = other.n this.currMean = other.currMean.copy this.currM2n = other.currM2n.copy - this.currM2 = other.currM2.copy this.totalCnt = other.totalCnt this.nnz = other.nnz.copy this.currMax = other.currMax.copy @@ -207,18 +200,4 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } Vectors.fromBreeze(currMin) } - - override def magnitude: Vector = { - require(totalCnt > 0, s"Nothing has been added to this summarizer.") - - val realMagnitude = BDV.zeros[Double](n) - - var i = 0 - while (i < currM2.size) { - realMagnitude(i) = math.sqrt(currM2(i)) - i += 1 - } - - Vectors.fromBreeze(realMagnitude) - } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala index 55ce943aab29d..f9eb343da2b82 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala @@ -53,9 +53,4 @@ trait MultivariateStatisticalSummary { * Minimum value of each column. */ def min: Vector - - /** - * Euclidean magnitude of each column - */ - def magnitude: Vector } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 9fdbae04d192d..462f61ccdfcf5 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -217,8 +217,6 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { assert(summary.numNonzeros === Vectors.dense(3.0, 3.0, 4.0), "nnz mismatch") assert(summary.max === Vectors.dense(9.0, 7.0, 8.0), "max mismatch") assert(summary.min === Vectors.dense(0.0, 0.0, 1.0), "column mismatch.") - assert(summary.magnitude === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)), - "magnitude mismatch.") } } } From f56a882086d913780928443aa7910eab24474359 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 6 Aug 2014 14:53:52 -0700 Subject: [PATCH 12/27] Remove changes to MultivariateOnlineSummarizer --- .../apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 83d49d9818c2c..5105b5c37aaaa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -60,7 +60,6 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S currMean = BDV.zeros[Double](n) currM2n = BDV.zeros[Double](n) - currM2 = BDV.zeros[Double](n) nnz = BDV.zeros[Double](n) currMax = BDV.fill(n)(Double.MinValue) currMin = BDV.fill(n)(Double.MaxValue) @@ -115,7 +114,6 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / (nnz(i) + other.nnz(i)) } - if (currMax(i) < other.currMax(i)) { currMax(i) = other.currMax(i) } From eb1dc204e03752ba3809068547f5a8d872468bf7 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 6 Aug 2014 22:49:35 -0700 Subject: [PATCH 13/27] Use Double.PositiveInfinity instead of Double.Max --- .../org/apache/spark/mllib/linalg/distributed/RowMatrix.scala | 4 ++-- .../spark/mllib/linalg/distributed/RowMatrixSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 53bf3f83b586b..c33e6ec63e137 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -413,7 +413,7 @@ class RowMatrix( */ def similarColumns(): CoordinateMatrix = { - similarColumnsDIMSUM(Double.MaxValue) + similarColumns(Double.PositiveInfinity) } /** @@ -425,7 +425,7 @@ class RowMatrix( * and n is the number of columns * @return An n x n sparse matrix of cosine similarities between columns of this matrix. */ - def similarColumnsDIMSUM(gamma: Double): + def similarColumns(gamma: Double): CoordinateMatrix = { similarColumnsDIMSUM(columnMagnitudes(), gamma) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 462f61ccdfcf5..13f2b623ddbbd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -107,7 +107,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { } for (mat <- Seq(denseMat, sparseMat)) { - val G = mat.similarColumnsDIMSUM(150.0) + val G = mat.similarColumns(150.0) assert(closeToZero(G.toBreeze() - expected)) } From 0f12adef693023a531aac95909c57723dba4b237 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sat, 30 Aug 2014 18:05:49 -0700 Subject: [PATCH 14/27] Style changes --- .../mllib/linalg/distributed/RowMatrix.scala | 76 +++++++++---------- 1 file changed, 35 insertions(+), 41 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index c33e6ec63e137..ff1878162c573 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -393,26 +393,12 @@ class RowMatrix( new RowMatrix(AB, nRows, B.numCols) } - /** - * Return 2-norm of the columns of this matrix. - * @return an array of column magnitudes - */ - def columnMagnitudes(): - Array[Double] = { - rows.map { - x => - val brzX = x.toBreeze - brzX.:*(brzX) - }.fold(BDV.zeros[Double](numCols().toInt))(_ + _).toArray.map(math.sqrt(_)) - } - /** * Find all similar columns using cosine similarity. * * @return An n x n sparse matrix of cosine similarities between columns of this matrix. */ - def similarColumns(): - CoordinateMatrix = { + def similarColumns(): CoordinateMatrix = { similarColumns(Double.PositiveInfinity) } @@ -425,11 +411,21 @@ class RowMatrix( * and n is the number of columns * @return An n x n sparse matrix of cosine similarities between columns of this matrix. */ - def similarColumns(gamma: Double): - CoordinateMatrix = { + def similarColumns(gamma: Double): CoordinateMatrix = { similarColumnsDIMSUM(columnMagnitudes(), gamma) } + /** + * Return 2-norm of the columns of this matrix. + * @return an array of column magnitudes + */ + def columnMagnitudes(): Array[Double] = { + rows.map { x => + val brzX = x.toBreeze + brzX.:*(brzX) + }.fold(BDV.zeros[Double](numCols().toInt))(_ + _).toArray.map(math.sqrt(_)) + } + /** * Find all similar columns using the DIMSUM sampling algorithm, described in * http://arxiv.org/abs/1304.1467 @@ -440,35 +436,33 @@ class RowMatrix( * and n is the number of columns * @return An n x n sparse matrix of cosine similarities between columns of this matrix. */ - def similarColumnsDIMSUM(colMags: Array[Double], gamma: Double): - CoordinateMatrix = { + def similarColumnsDIMSUM(colMags: Array[Double], gamma: Double): CoordinateMatrix = { require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma") - - require(colMags.size == this.numCols(), - "Number of magnitudes didn't match column dimension") + require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension") val sg = math.sqrt(gamma) // sqrt(gamma) used many times - val sims = rows.flatMap { - row => - val buf = new ListBuffer[((Long, Long), Double)]() - row.toBreeze.activeIterator.foreach { - case (_, 0.0) => // Skip explicit zero elements. - case (i, iVal) => - if (Math.random < sg / colMags(i)) { - row.toBreeze.activeIterator.foreach { - case (_, 0.0) => // Skip explicit zero elements. - case (j, jVal) => - if (Math.random < sg / colMags(j)) { - val contrib = ((i.toLong, j.toLong), (iVal * jVal) / - (math.min(sg, colMags(i)) * math.min(sg, colMags(j)))) - buf += contrib - } - } + val sims = rows.flatMap { row => + val buf = new ListBuffer[((Long, Long), Double)]() + row.toBreeze.activeIterator.foreach { + case (_, 0.0) => // Skip explicit zero elements. + case (i, iVal) => + if (Math.random < sg / colMags(i)) { + row.toBreeze.activeIterator.foreach { + case (_, 0.0) => // Skip explicit zero elements. + case (j, jVal) => + if (Math.random < sg / colMags(j)) { + val contrib = ((i.toLong, j.toLong), (iVal * jVal) / + (math.min(sg, colMags(i)) * math.min(sg, colMags(j)))) + buf += contrib + } } - } - buf - }.reduceByKey(_ + _).map { case ((i, j), sim) => MatrixEntry(i, j, sim)} + } + } + buf + }.reduceByKey(_ + _).map { case ((i, j), sim) => + MatrixEntry(i, j, sim) + } new CoordinateMatrix(sims, numCols(), numCols()) } From 75a0b51d37d51514e0e7d38d4113e14ce5bfa889 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sat, 30 Aug 2014 18:30:04 -0700 Subject: [PATCH 15/27] Use Ints instead of Longs in the shuffle --- .../apache/spark/mllib/linalg/distributed/RowMatrix.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index ff1878162c573..34760b7a72909 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -443,7 +443,7 @@ class RowMatrix( val sg = math.sqrt(gamma) // sqrt(gamma) used many times val sims = rows.flatMap { row => - val buf = new ListBuffer[((Long, Long), Double)]() + val buf = new ListBuffer[((Int, Int), Double)]() row.toBreeze.activeIterator.foreach { case (_, 0.0) => // Skip explicit zero elements. case (i, iVal) => @@ -452,7 +452,7 @@ class RowMatrix( case (_, 0.0) => // Skip explicit zero elements. case (j, jVal) => if (Math.random < sg / colMags(j)) { - val contrib = ((i.toLong, j.toLong), (iVal * jVal) / + val contrib = ((i, j), (iVal * jVal) / (math.min(sg, colMags(i)) * math.min(sg, colMags(j)))) buf += contrib } @@ -461,7 +461,7 @@ class RowMatrix( } buf }.reduceByKey(_ + _).map { case ((i, j), sim) => - MatrixEntry(i, j, sim) + MatrixEntry(i.toLong, j.toLong, sim) } new CoordinateMatrix(sims, numCols(), numCols()) } From 613f2614f44352d6b7cd29712a83f2d33009acd0 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Wed, 10 Sep 2014 18:06:28 -0700 Subject: [PATCH 16/27] Column magnitude summary --- .../mllib/linalg/distributed/RowMatrix.scala | 16 +++--------- .../stat/MultivariateOnlineSummarizer.scala | 25 ++++++++++++++++++- .../stat/MultivariateStatisticalSummary.scala | 5 ++++ .../linalg/distributed/RowMatrixSuite.scala | 10 +++++--- 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 34760b7a72909..62a8f2571c9a0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.linalg.distributed import java.util.Arrays +import scala.collection.mutable.ListBuffer import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV} import breeze.linalg.{svd => brzSvd, axpy => brzAxpy} @@ -32,7 +33,6 @@ import org.apache.spark.Logging import org.apache.spark.mllib.rdd.RDDFunctions._ import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary} -import scala.collection.mutable.ListBuffer /** * :: Experimental :: @@ -412,18 +412,8 @@ class RowMatrix( * @return An n x n sparse matrix of cosine similarities between columns of this matrix. */ def similarColumns(gamma: Double): CoordinateMatrix = { - similarColumnsDIMSUM(columnMagnitudes(), gamma) - } - - /** - * Return 2-norm of the columns of this matrix. - * @return an array of column magnitudes - */ - def columnMagnitudes(): Array[Double] = { - rows.map { x => - val brzX = x.toBreeze - brzX.:*(brzX) - }.fold(BDV.zeros[Double](numCols().toInt))(_ + _).toArray.map(math.sqrt(_)) + val colMags = computeColumnSummaryStatistics().magnitude.toArray + similarColumnsDIMSUM(colMags, gamma) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 5105b5c37aaaa..9a09de3820c61 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -42,6 +42,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S private var n = 0 private var currMean: BDV[Double] = _ private var currM2n: BDV[Double] = _ + private var currM2: BDV[Double] = _ private var totalCnt: Long = 0 private var nnz: BDV[Double] = _ private var currMax: BDV[Double] = _ @@ -60,6 +61,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S currMean = BDV.zeros[Double](n) currM2n = BDV.zeros[Double](n) + currM2 = BDV.zeros[Double](n) nnz = BDV.zeros[Double](n) currMax = BDV.fill(n)(Double.MinValue) currMin = BDV.fill(n)(Double.MaxValue) @@ -81,6 +83,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S val tmpPrevMean = currMean(i) currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0) currM2n(i) += (value - currMean(i)) * (value - tmpPrevMean) + currM2(i) += value * value nnz(i) += 1.0 } @@ -97,7 +100,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S * @return This MultivariateOnlineSummarizer object. */ def merge(other: MultivariateOnlineSummarizer): this.type = { - if (this.totalCnt != 0 && other.totalCnt != 0) { + if (this.totalCnt != 0 && other.totalCnt != 0) { require(n == other.n, s"Dimensions mismatch when merging with another summarizer. " + s"Expecting $n but got ${other.n}.") totalCnt += other.totalCnt @@ -114,6 +117,11 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / (nnz(i) + other.nnz(i)) } + // merge m2 together + if (nnz(i) + other.nnz(i) != 0.0) { + currM2(i) += other.currM2(i) + } + if (currMax(i) < other.currMax(i)) { currMax(i) = other.currMax(i) } @@ -127,6 +135,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S this.n = other.n this.currMean = other.currMean.copy this.currM2n = other.currM2n.copy + this.currM2 = other.currM2.copy this.totalCnt = other.totalCnt this.nnz = other.nnz.copy this.currMax = other.currMax.copy @@ -198,4 +207,18 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } Vectors.fromBreeze(currMin) } + + override def magnitude: Vector = { + require(totalCnt > 0, s"Nothing has been added to this summarizer.") + + val realMagnitude = BDV.zeros[Double](n) + + var i = 0 + while (i < currM2.size) { + realMagnitude(i) = math.sqrt(currM2(i)) + i += 1 + } + + Vectors.fromBreeze(realMagnitude) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala index f9eb343da2b82..55ce943aab29d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala @@ -53,4 +53,9 @@ trait MultivariateStatisticalSummary { * Minimum value of each column. */ def min: Vector + + /** + * Euclidean magnitude of each column + */ + def magnitude: Vector } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 13f2b623ddbbd..b80557022914c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -98,16 +98,16 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { test("similar columns") { val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)) val expected = BDM( - (126.0, 54.0, 72.0), - (54.0, 66.0, 78.0), - (72.0, 78.0, 94.0)) + (126.0, 54.0, 72.0), + (54.0, 66.0, 78.0), + (72.0, 78.0, 94.0)) for(i <- 0 until n) for(j <- 0 until n) { expected(i, j) /= (colMags(i) * colMags(j)) } for (mat <- Seq(denseMat, sparseMat)) { - val G = mat.similarColumns(150.0) + val G = mat.similarColumnsDIMSUM(150.0) assert(closeToZero(G.toBreeze() - expected)) } @@ -217,6 +217,8 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { assert(summary.numNonzeros === Vectors.dense(3.0, 3.0, 4.0), "nnz mismatch") assert(summary.max === Vectors.dense(9.0, 7.0, 8.0), "max mismatch") assert(summary.min === Vectors.dense(0.0, 0.0, 1.0), "column mismatch.") + assert(summary.magnitude === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)), + "magnitude mismatch.") } } } From e9c67911610d18ccfd5976e40c7b54f3250c100b Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sun, 14 Sep 2014 15:38:03 -0700 Subject: [PATCH 17/27] New interface and documentation --- .../mllib/linalg/distributed/RowMatrix.scala | 78 ++++++++++++++----- .../stat/MultivariateOnlineSummarizer.scala | 13 ++++ .../stat/MultivariateStatisticalSummary.scala | 5 ++ .../linalg/distributed/RowMatrixSuite.scala | 20 +++-- 4 files changed, 89 insertions(+), 27 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 62a8f2571c9a0..1b3c5eb46cd93 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -394,39 +394,73 @@ class RowMatrix( } /** - * Find all similar columns using cosine similarity. + * Compute all similarities between columns of this matrix using the brute-force + * approach of computing normalized dot products. * - * @return An n x n sparse matrix of cosine similarities between columns of this matrix. + * @return An n x n sparse upper-triangular matrix of cosine similarities between columns of this matrix. */ - def similarColumns(): CoordinateMatrix = { - similarColumns(Double.PositiveInfinity) + def columnSimilarities(): CoordinateMatrix = { + similarColumns(0.0) } /** - * Find all similar columns using the DIMSUM sampling algorithm, described in - * http://arxiv.org/abs/1304.1467 + * Compute all similarities between columns of this matrix using a sampling approach. * - * @param gamma The oversampling parameter. For provable results, set to 4 * log(n) / s, - * where s is the smallest similarity score to be estimated, - * and n is the number of columns - * @return An n x n sparse matrix of cosine similarities between columns of this matrix. + * The threshold parameter is a trade-off knob between correctness and computational cost. + * + * Setting a threshold of 0 guarantees deterministic correct results, but comes at exactly + * the same cost as the brute-force approach. Setting the threshold to positive values + * incurs strictly less computational cost than the brute-force aproach, however the + * similarities computed will be estimates. + * + * The sampling guarantees correctness for those pairs of columns that have + * similarity greater than the given similarity threshold. + * + * To describe the guarantee, we set some notation: + * Let A be the smallest in magnitude non-zero element of this matrix. + * Let B be the largest in magnitude non-zero element of this matrix. + * Let L be the number of non-zeros per row. + * + * For example, for {0,1} matrices: A=B=1. + * Another example, for the Netflix matrix: A=1, B=5 + * + * For those column pairs that are above the threshold, + * the computed similarity is correct to within 20% relative error with probability + * at least 1 - (0.981)^(100/B) + * + * The shuffle size is bounded by the *smaller* of the following two expressions: + * + * O(n log(n) L / (threshold * A)) + * O(m L^2) + * + * The latter is the cost of the brute-force approach, so for non-zero thresholds, + * the cost is always cheaper than the brute-force approach. + * + * @param threshold Similarities above this threshold are probably computed correctly. + * Set to 0 for deterministic guaranteed correctness. + * @return An n x n sparse upper-triangular matrix of cosine similarities between columns of this matrix. */ - def similarColumns(gamma: Double): CoordinateMatrix = { - val colMags = computeColumnSummaryStatistics().magnitude.toArray - similarColumnsDIMSUM(colMags, gamma) + def similarColumns(threshold: Double): CoordinateMatrix = { + require(threshold >= 0 && threshold <= 1, s"Threshold not in [0,1]: $threshold") + + val gamma = if (math.abs(threshold) < 1e-6) Double.PositiveInfinity else 100 * math.log(numCols()) / threshold + + similarColumnsDIMSUM(computeColumnSummaryStatistics().magnitude.toArray, gamma) } /** - * Find all similar columns using the DIMSUM sampling algorithm, described in + * Find all similar columns using the DIMSUM sampling algorithm, described in two papers + * + * http://arxiv.org/abs/1206.2082 * http://arxiv.org/abs/1304.1467 * * @param colMags A vector of column magnitudes - * @param gamma The oversampling parameter. For provable results, set to 4 * log(n) / s, + * @param gamma The oversampling parameter. For provable results, set to 100 * log(n) / s, * where s is the smallest similarity score to be estimated, * and n is the number of columns - * @return An n x n sparse matrix of cosine similarities between columns of this matrix. + * @return An n x n sparse upper-triangular matrix of cosine similarities between columns of this matrix. */ - def similarColumnsDIMSUM(colMags: Array[Double], gamma: Double): CoordinateMatrix = { + private[mllib] def similarColumnsDIMSUM(colMags: Array[Double], gamma: Double): CoordinateMatrix = { require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma") require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension") @@ -437,13 +471,15 @@ class RowMatrix( row.toBreeze.activeIterator.foreach { case (_, 0.0) => // Skip explicit zero elements. case (i, iVal) => - if (Math.random < sg / colMags(i)) { + val rand = new scala.util.Random(iVal.toLong) + val ci = colMags(i) + if (rand.nextDouble < sg / ci) { row.toBreeze.activeIterator.foreach { case (_, 0.0) => // Skip explicit zero elements. case (j, jVal) => - if (Math.random < sg / colMags(j)) { - val contrib = ((i, j), (iVal * jVal) / - (math.min(sg, colMags(i)) * math.min(sg, colMags(j)))) + val cj = colMags(j) + if (i < j && rand.nextDouble < sg / cj) { + val contrib = ((i, j), (iVal * jVal) / (math.min(sg, ci) * math.min(sg, cj))) buf += contrib } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 9a09de3820c61..6c505bbc8c7ab 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -43,6 +43,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S private var currMean: BDV[Double] = _ private var currM2n: BDV[Double] = _ private var currM2: BDV[Double] = _ + private var currL1: BDV[Double] = _ private var totalCnt: Long = 0 private var nnz: BDV[Double] = _ private var currMax: BDV[Double] = _ @@ -62,6 +63,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S currMean = BDV.zeros[Double](n) currM2n = BDV.zeros[Double](n) currM2 = BDV.zeros[Double](n) + currL1 = BDV.zeros[Double](n) nnz = BDV.zeros[Double](n) currMax = BDV.fill(n)(Double.MinValue) currMin = BDV.fill(n)(Double.MaxValue) @@ -84,6 +86,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0) currM2n(i) += (value - currMean(i)) * (value - tmpPrevMean) currM2(i) += value * value + currL1(i) += math.abs(value) nnz(i) += 1.0 } @@ -121,6 +124,10 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S if (nnz(i) + other.nnz(i) != 0.0) { currM2(i) += other.currM2(i) } + // merge l2 together + if (nnz(i) + other.nnz(i) != 0.0) { + currL1(i) += other.currL1(i) + } if (currMax(i) < other.currMax(i)) { currMax(i) = other.currMax(i) @@ -136,6 +143,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S this.currMean = other.currMean.copy this.currM2n = other.currM2n.copy this.currM2 = other.currM2.copy + this.currL1 = other.currL1.copy this.totalCnt = other.totalCnt this.nnz = other.nnz.copy this.currMax = other.currMax.copy @@ -221,4 +229,9 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S Vectors.fromBreeze(realMagnitude) } + + override def normL1: Vector = { + require(totalCnt > 0, s"Nothing has been added to this summarizer.") + Vectors.fromBreeze(currL1) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala index 55ce943aab29d..587285368f143 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala @@ -58,4 +58,9 @@ trait MultivariateStatisticalSummary { * Euclidean magnitude of each column */ def magnitude: Vector + + /** + * L1 norm of each column + */ + def normL1: Vector } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index b80557022914c..c486d54fffa22 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -98,21 +98,28 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { test("similar columns") { val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)) val expected = BDM( - (126.0, 54.0, 72.0), - (54.0, 66.0, 78.0), - (72.0, 78.0, 94.0)) + (0.0, 54.0, 72.0), + (0.0, 0.0, 78.0), + (0.0, 0.0, 0.0)) for(i <- 0 until n) for(j <- 0 until n) { expected(i, j) /= (colMags(i) * colMags(j)) } for (mat <- Seq(denseMat, sparseMat)) { - val G = mat.similarColumnsDIMSUM(150.0) - assert(closeToZero(G.toBreeze() - expected)) + val G = mat.similarColumns(0.1).toBreeze() + for(i <- 0 until n) for(j <- 0 until n) { + if (expected(i, j) > 0) { + val actual = expected(i, j) + val estimate = G(i, j) + assert(math.abs(actual - estimate) / actual < 0.1, + s"Similarities not close enough: $actual vs $estimate") + } + } } for (mat <- Seq(denseMat, sparseMat)) { - val G = mat.similarColumns() + val G = mat.columnSimilarities() assert(closeToZero(G.toBreeze() - expected)) } @@ -219,6 +226,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { assert(summary.min === Vectors.dense(0.0, 0.0, 1.0), "column mismatch.") assert(summary.magnitude === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)), "magnitude mismatch.") + assert(summary.normL1 === Vectors.dense(18.0, 12.0, 16.0), "L1 norm mismatch") } } } From 3764983783b3cdbaeec37e7d637f0cfa004d759e Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sun, 14 Sep 2014 15:46:37 -0700 Subject: [PATCH 18/27] Documentation --- .../apache/spark/mllib/linalg/distributed/RowMatrix.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 1b3c5eb46cd93..c9cb0f8a4d619 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -394,7 +394,7 @@ class RowMatrix( } /** - * Compute all similarities between columns of this matrix using the brute-force + * Compute all cosine similarities between columns of this matrix using the brute-force * approach of computing normalized dot products. * * @return An n x n sparse upper-triangular matrix of cosine similarities between columns of this matrix. @@ -406,14 +406,14 @@ class RowMatrix( /** * Compute all similarities between columns of this matrix using a sampling approach. * - * The threshold parameter is a trade-off knob between correctness and computational cost. + * The threshold parameter is a trade-off knob between estimate quality and computational cost. * * Setting a threshold of 0 guarantees deterministic correct results, but comes at exactly * the same cost as the brute-force approach. Setting the threshold to positive values - * incurs strictly less computational cost than the brute-force aproach, however the + * incurs strictly less computational cost than the brute-force approach, however the * similarities computed will be estimates. * - * The sampling guarantees correctness for those pairs of columns that have + * The sampling guarantees relative-error correctness for those pairs of columns that have * similarity greater than the given similarity threshold. * * To describe the guarantee, we set some notation: From fb296f692da0bfefc4c282466539f2b67f121b43 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sun, 14 Sep 2014 15:52:20 -0700 Subject: [PATCH 19/27] renamed to normL1 and normL2 --- .../org/apache/spark/mllib/linalg/distributed/RowMatrix.scala | 2 +- .../apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala | 2 +- .../spark/mllib/stat/MultivariateStatisticalSummary.scala | 2 +- .../apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index c9cb0f8a4d619..6785d99edf8f4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -445,7 +445,7 @@ class RowMatrix( val gamma = if (math.abs(threshold) < 1e-6) Double.PositiveInfinity else 100 * math.log(numCols()) / threshold - similarColumnsDIMSUM(computeColumnSummaryStatistics().magnitude.toArray, gamma) + similarColumnsDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 6c505bbc8c7ab..37dc8f0d37788 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -216,7 +216,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S Vectors.fromBreeze(currMin) } - override def magnitude: Vector = { + override def normL2: Vector = { require(totalCnt > 0, s"Nothing has been added to this summarizer.") val realMagnitude = BDV.zeros[Double](n) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala index 587285368f143..6a364c93284af 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala @@ -57,7 +57,7 @@ trait MultivariateStatisticalSummary { /** * Euclidean magnitude of each column */ - def magnitude: Vector + def normL2: Vector /** * L1 norm of each column diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index c486d54fffa22..06e685c4c8edd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -224,7 +224,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { assert(summary.numNonzeros === Vectors.dense(3.0, 3.0, 4.0), "nnz mismatch") assert(summary.max === Vectors.dense(9.0, 7.0, 8.0), "max mismatch") assert(summary.min === Vectors.dense(0.0, 0.0, 1.0), "column mismatch.") - assert(summary.magnitude === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)), + assert(summary.normL2 === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)), "magnitude mismatch.") assert(summary.normL1 === Vectors.dense(18.0, 12.0, 16.0), "L1 norm mismatch") } From 25e9d0d9a373d1e0555e5a8a6fa9106f8ba17d8e Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sun, 14 Sep 2014 16:08:00 -0700 Subject: [PATCH 20/27] Line length for style --- .../mllib/linalg/distributed/RowMatrix.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 6785d99edf8f4..142fca0cd1d9d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -397,7 +397,8 @@ class RowMatrix( * Compute all cosine similarities between columns of this matrix using the brute-force * approach of computing normalized dot products. * - * @return An n x n sparse upper-triangular matrix of cosine similarities between columns of this matrix. + * @return An n x n sparse upper-triangular matrix of cosine similarities between + * columns of this matrix. */ def columnSimilarities(): CoordinateMatrix = { similarColumns(0.0) @@ -438,12 +439,17 @@ class RowMatrix( * * @param threshold Similarities above this threshold are probably computed correctly. * Set to 0 for deterministic guaranteed correctness. - * @return An n x n sparse upper-triangular matrix of cosine similarities between columns of this matrix. + * @return An n x n sparse upper-triangular matrix of cosine similarities + * between columns of this matrix. */ def similarColumns(threshold: Double): CoordinateMatrix = { require(threshold >= 0 && threshold <= 1, s"Threshold not in [0,1]: $threshold") - val gamma = if (math.abs(threshold) < 1e-6) Double.PositiveInfinity else 100 * math.log(numCols()) / threshold + val gamma = if (math.abs(threshold) < 1e-6) { + Double.PositiveInfinity + } else { + 100 * math.log(numCols()) / threshold + } similarColumnsDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma) } @@ -458,9 +464,11 @@ class RowMatrix( * @param gamma The oversampling parameter. For provable results, set to 100 * log(n) / s, * where s is the smallest similarity score to be estimated, * and n is the number of columns - * @return An n x n sparse upper-triangular matrix of cosine similarities between columns of this matrix. + * @return An n x n sparse upper-triangular matrix of cosine similarities + * between columns of this matrix. */ - private[mllib] def similarColumnsDIMSUM(colMags: Array[Double], gamma: Double): CoordinateMatrix = { + private[mllib] def similarColumnsDIMSUM(colMags: Array[Double], + gamma: Double): CoordinateMatrix = { require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma") require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension") From 251bb9ce27a937444861f4e713d7912796feecb7 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sun, 14 Sep 2014 16:40:57 -0700 Subject: [PATCH 21/27] Documentation --- .../apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 37dc8f0d37788..0310dd1f50695 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -124,7 +124,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S if (nnz(i) + other.nnz(i) != 0.0) { currM2(i) += other.currM2(i) } - // merge l2 together + // merge l1 together if (nnz(i) + other.nnz(i) != 0.0) { currL1(i) += other.currL1(i) } From 0e4eda4634ec7ef5c480c55aacd64ae2ae938c42 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Sat, 20 Sep 2014 00:57:55 -0700 Subject: [PATCH 22/27] Use partition index for RNG --- .../mllib/linalg/distributed/RowMatrix.scala | 57 ++++++++++--------- .../linalg/distributed/RowMatrixSuite.scala | 8 +-- 2 files changed, 34 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 142fca0cd1d9d..b3398626d488d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.linalg.distributed import java.util.Arrays + import scala.collection.mutable.ListBuffer import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV} @@ -33,7 +34,6 @@ import org.apache.spark.Logging import org.apache.spark.mllib.rdd.RDDFunctions._ import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary} - /** * :: Experimental :: * Represents a row-oriented distributed Matrix with no meaningful row indices. @@ -405,7 +405,7 @@ class RowMatrix( } /** - * Compute all similarities between columns of this matrix using a sampling approach. + * Compute similarities between columns of this matrix using a sampling approach. * * The threshold parameter is a trade-off knob between estimate quality and computational cost. * @@ -420,14 +420,14 @@ class RowMatrix( * To describe the guarantee, we set some notation: * Let A be the smallest in magnitude non-zero element of this matrix. * Let B be the largest in magnitude non-zero element of this matrix. - * Let L be the number of non-zeros per row. + * Let L be the maximum number of non-zeros per row. * * For example, for {0,1} matrices: A=B=1. * Another example, for the Netflix matrix: A=1, B=5 * * For those column pairs that are above the threshold, * the computed similarity is correct to within 20% relative error with probability - * at least 1 - (0.981)^(100/B) + * at least 1 - (0.981)^(10/B) * * The shuffle size is bounded by the *smaller* of the following two expressions: * @@ -437,18 +437,19 @@ class RowMatrix( * The latter is the cost of the brute-force approach, so for non-zero thresholds, * the cost is always cheaper than the brute-force approach. * - * @param threshold Similarities above this threshold are probably computed correctly. - * Set to 0 for deterministic guaranteed correctness. + * @param threshold Set to 0 for deterministic guaranteed correctness. + * Similarities above this threshold are estimated + * with the cost vs estimate quality trade-off described above. * @return An n x n sparse upper-triangular matrix of cosine similarities * between columns of this matrix. */ def similarColumns(threshold: Double): CoordinateMatrix = { require(threshold >= 0 && threshold <= 1, s"Threshold not in [0,1]: $threshold") - val gamma = if (math.abs(threshold) < 1e-6) { + val gamma = if (threshold < 1e-6) { Double.PositiveInfinity } else { - 100 * math.log(numCols()) / threshold + 10 * math.log(numCols()) / threshold } similarColumnsDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma) @@ -468,32 +469,34 @@ class RowMatrix( * between columns of this matrix. */ private[mllib] def similarColumnsDIMSUM(colMags: Array[Double], - gamma: Double): CoordinateMatrix = { + gamma: Double): CoordinateMatrix = { require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma") require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension") val sg = math.sqrt(gamma) // sqrt(gamma) used many times - val sims = rows.flatMap { row => - val buf = new ListBuffer[((Int, Int), Double)]() - row.toBreeze.activeIterator.foreach { - case (_, 0.0) => // Skip explicit zero elements. - case (i, iVal) => - val rand = new scala.util.Random(iVal.toLong) - val ci = colMags(i) - if (rand.nextDouble < sg / ci) { - row.toBreeze.activeIterator.foreach { - case (_, 0.0) => // Skip explicit zero elements. - case (j, jVal) => - val cj = colMags(j) - if (i < j && rand.nextDouble < sg / cj) { - val contrib = ((i, j), (iVal * jVal) / (math.min(sg, ci) * math.min(sg, cj))) - buf += contrib - } + val sims = rows.mapPartitionsWithIndex { (indx, iter) => + val rand = new scala.util.Random(indx) + iter.flatMap { row => + val buf = new ListBuffer[((Int, Int), Double)]() + row.toBreeze.activeIterator.foreach { + case (_, 0.0) => // Skip explicit zero elements. + case (i, iVal) => + val ci = colMags(i) + if (rand.nextDouble < sg / ci) { + row.toBreeze.activeIterator.foreach { + case (_, 0.0) => // Skip explicit zero elements. + case (j, jVal) => + val cj = colMags(j) + if (i < j && rand.nextDouble < sg / cj) { + val contrib = ((i, j), (iVal * jVal) / (math.min(sg, ci) * math.min(sg, cj))) + buf += contrib + } + } } - } + } + buf } - buf }.reduceByKey(_ + _).map { case ((i, j), sim) => MatrixEntry(i.toLong, j.toLong, sim) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 06e685c4c8edd..550655faa1fa8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -102,17 +102,17 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { (0.0, 0.0, 78.0), (0.0, 0.0, 0.0)) - for(i <- 0 until n) for(j <- 0 until n) { + for (i <- 0 until n; j <- 0 until n) { expected(i, j) /= (colMags(i) * colMags(j)) } for (mat <- Seq(denseMat, sparseMat)) { - val G = mat.similarColumns(0.1).toBreeze() - for(i <- 0 until n) for(j <- 0 until n) { + val G = mat.similarColumns(0.11).toBreeze() + for (i <- 0 until n; j <- 0 until n) { if (expected(i, j) > 0) { val actual = expected(i, j) val estimate = G(i, j) - assert(math.abs(actual - estimate) / actual < 0.1, + assert(math.abs(actual - estimate) / actual < 0.2, s"Similarities not close enough: $actual vs $estimate") } } From f2947e49c9a67150d97ff4e24663e9e1823a549a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 23 Sep 2014 22:16:16 -0700 Subject: [PATCH 23/27] some optimization --- .../mllib/linalg/distributed/RowMatrix.scala | 84 ++++++++++++++----- .../linalg/distributed/RowMatrixSuite.scala | 4 +- 2 files changed, 63 insertions(+), 25 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 5a04490dcff7e..def9d9569a8e4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -19,6 +19,8 @@ package org.apache.spark.mllib.linalg.distributed import java.util.Arrays +import org.apache.spark.util.random.XORShiftRandom + import scala.collection.mutable.ListBuffer import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV} @@ -411,7 +413,7 @@ class RowMatrix( * columns of this matrix. */ def columnSimilarities(): CoordinateMatrix = { - similarColumns(0.0) + columnSimilarities(0.0) } /** @@ -437,12 +439,12 @@ class RowMatrix( * * For those column pairs that are above the threshold, * the computed similarity is correct to within 20% relative error with probability - * at least 1 - (0.981)^(10/B) + * at least 1 - (0.981)^10/B^ * * The shuffle size is bounded by the *smaller* of the following two expressions: * * O(n log(n) L / (threshold * A)) - * O(m L^2) + * O(m L^2^) * * The latter is the cost of the brute-force approach, so for non-zero thresholds, * the cost is always cheaper than the brute-force approach. @@ -453,7 +455,7 @@ class RowMatrix( * @return An n x n sparse upper-triangular matrix of cosine similarities * between columns of this matrix. */ - def similarColumns(threshold: Double): CoordinateMatrix = { + def columnSimilarities(threshold: Double): CoordinateMatrix = { require(threshold >= 0 && threshold <= 1, s"Threshold not in [0,1]: $threshold") val gamma = if (threshold < 1e-6) { @@ -462,7 +464,9 @@ class RowMatrix( 10 * math.log(numCols()) / threshold } - similarColumnsDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma) + println(s"gamma: $gamma") + + columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma) } /** @@ -472,37 +476,71 @@ class RowMatrix( * http://arxiv.org/abs/1304.1467 * * @param colMags A vector of column magnitudes - * @param gamma The oversampling parameter. For provable results, set to 100 * log(n) / s, + * @param gamma The oversampling parameter. For provable results, set to 10 * log(n) / s, * where s is the smallest similarity score to be estimated, * and n is the number of columns * @return An n x n sparse upper-triangular matrix of cosine similarities * between columns of this matrix. */ - private[mllib] def similarColumnsDIMSUM(colMags: Array[Double], - gamma: Double): CoordinateMatrix = { + private[mllib] def columnSimilaritiesDIMSUM( + colMags: Array[Double], + gamma: Double): CoordinateMatrix = { require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma") require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension") - val sg = math.sqrt(gamma) // sqrt(gamma) used many times - + val p = colMags.map(c => sg / c) + val q = colMags.map(c => math.min(sg, c)) val sims = rows.mapPartitionsWithIndex { (indx, iter) => - val rand = new scala.util.Random(indx) + val rand = new XORShiftRandom(indx) + val scaled = new Array[Double](p.size) iter.flatMap { row => val buf = new ListBuffer[((Int, Int), Double)]() - row.toBreeze.activeIterator.foreach { - case (_, 0.0) => // Skip explicit zero elements. - case (i, iVal) => - val ci = colMags(i) - if (rand.nextDouble < sg / ci) { - row.toBreeze.activeIterator.foreach { - case (_, 0.0) => // Skip explicit zero elements. - case (j, jVal) => - val cj = colMags(j) - if (i < j && rand.nextDouble < sg / cj) { - val contrib = ((i, j), (iVal * jVal) / (math.min(sg, ci) * math.min(sg, cj))) - buf += contrib + row match { + case sv: SparseVector => + val nnz = sv.indices.size + var k = 0 + while (k < nnz) { + scaled(k) = sv.values(k) / q(sv.indices(k)) + k += 1 + } + k = 0 + while (k < nnz) { + val i = sv.indices(k) + val iVal = scaled(k) + if (iVal != 0 && rand.nextDouble() < p(i)) { + var l = k + 1 + while (l < nnz) { + val j = sv.indices(l) + val jVal = scaled(l) + if (jVal != 0 && rand.nextDouble() < p(j)) { + buf += (((i, j), iVal * jVal)) + } + l += 1 + } + } + k += 1 + } + case dv: DenseVector => + val n = dv.values.size + var i = 0 + while (i < n) { + scaled(i) = dv.values(i) / q(i) + i += 1 + } + i = 0 + while (i < n) { + val iVal = scaled(i) + if (iVal != 0 && rand.nextDouble() < p(i)) { + var j = i + 1 + while (j < n) { + val jVal = scaled(j) + if (jVal != 0 && rand.nextDouble() < p(j)) { + buf += (((i, j), iVal * jVal)) } + j += 1 + } } + i += 1 } } buf diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 3dfbc68b8a902..63f3ed58c0d4d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -107,7 +107,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { } for (mat <- Seq(denseMat, sparseMat)) { - val G = mat.similarColumns(0.11).toBreeze() + val G = mat.columnSimilarities(0.11).toBreeze() for (i <- 0 until n; j <- 0 until n) { if (expected(i, j) > 0) { val actual = expected(i, j) @@ -124,7 +124,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { } for (mat <- Seq(denseMat, sparseMat)) { - val G = mat.similarColumnsDIMSUM(colMags.toArray, 150.0) + val G = mat.columnSimilaritiesDIMSUM(colMags.toArray, 150.0) assert(closeToZero(G.toBreeze() - expected)) } } From 9fe17c02f4d71fea98fbf21d6836570e5e448b94 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 25 Sep 2014 15:52:59 -0700 Subject: [PATCH 24/27] organize imports --- .../spark/mllib/linalg/distributed/RowMatrix.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index def9d9569a8e4..576896b0b4e35 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -19,22 +19,21 @@ package org.apache.spark.mllib.linalg.distributed import java.util.Arrays -import org.apache.spark.util.random.XORShiftRandom - import scala.collection.mutable.ListBuffer -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV} -import breeze.linalg.{svd => brzSvd, axpy => brzAxpy} +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV, axpy => brzAxpy, + svd => brzSvd} import breeze.numerics.{sqrt => brzSqrt} import com.github.fommil.netlib.BLAS.{getInstance => blas} +import org.apache.spark.Logging +import org.apache.spark.SparkContext._ import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg._ -import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext._ -import org.apache.spark.Logging import org.apache.spark.mllib.rdd.RDDFunctions._ import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom /** * :: Experimental :: From aea024727fc26a61f15acdcbe5d9dd02b49c8101 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Thu, 25 Sep 2014 17:05:45 -0700 Subject: [PATCH 25/27] Allow large thresholds to promote sparsity --- .../apache/spark/mllib/linalg/distributed/RowMatrix.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 576896b0b4e35..24d401130ac24 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -455,7 +455,13 @@ class RowMatrix( * between columns of this matrix. */ def columnSimilarities(threshold: Double): CoordinateMatrix = { - require(threshold >= 0 && threshold <= 1, s"Threshold not in [0,1]: $threshold") + require(threshold >= 0, s"Threshold cannot be negative: $threshold") + + if (threshold > 1) { + logWarning(s"Threshold is greater than 1: $threshold " + + "Computation will be more efficient with promoted sparsity, " + + " however there is no correctness guarantee.") + } val gamma = if (threshold < 1e-6) { Double.PositiveInfinity From 976ddd4d962885014923fdf7c11bd79a1598eb83 Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Thu, 25 Sep 2014 18:19:51 -0700 Subject: [PATCH 26/27] Broadcast colMags. Avoid div by zero. --- .../mllib/linalg/distributed/RowMatrix.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 46a2015b32d57..8380058cf9b41 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -480,8 +480,6 @@ class RowMatrix( 10 * math.log(numCols()) / threshold } - println(s"gamma: $gamma") - columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma) } @@ -504,9 +502,18 @@ class RowMatrix( require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma") require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension") val sg = math.sqrt(gamma) // sqrt(gamma) used many times - val p = colMags.map(c => sg / c) - val q = colMags.map(c => math.min(sg, c)) + + // Don't divide by zero for those columns with zero magnitude + val colMagsCorrected = colMags.map(x => if (x == 0) 1.0 else x) + + val sc = rows.context + val pBV = sc.broadcast(colMagsCorrected.map(c => sg / c)) + val qBV = sc.broadcast(colMagsCorrected.map(c => math.min(sg, c))) + val sims = rows.mapPartitionsWithIndex { (indx, iter) => + val p = pBV.value + val q = qBV.value + val rand = new XORShiftRandom(indx) val scaled = new Array[Double](p.size) iter.flatMap { row => From 4eb71c63edffb4011bbfa36675193281f10189cb Mon Sep 17 00:00:00 2001 From: Reza Zadeh Date: Fri, 26 Sep 2014 11:16:59 -0700 Subject: [PATCH 27/27] Add excludes for normL1 and normL2 --- project/MimaExcludes.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 3280e662fa0b1..1adfaa18c6202 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -39,7 +39,14 @@ object MimaExcludes { MimaBuild.excludeSparkPackage("graphx") ) ++ MimaBuild.excludeSparkClass("mllib.linalg.Matrix") ++ - MimaBuild.excludeSparkClass("mllib.linalg.Vector") + MimaBuild.excludeSparkClass("mllib.linalg.Vector") ++ + Seq( + // Added normL1 and normL2 to trait MultivariateStatisticalSummary + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2") + ) case v if v.startsWith("1.1") => Seq(