From a14c0da0360b4202a2db787b85ce631562014f0d Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 17 Nov 2014 01:33:36 -0800 Subject: [PATCH 01/17] [SPARK-4409] Initial commit to add methods --- .../apache/spark/mllib/linalg/Matrices.scala | 558 +++++++++++++++++- .../spark/mllib/linalg/MatricesSuite.scala | 109 ++++ 2 files changed, 636 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 2cc52e94282ba..d9298cacd7d99 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -20,9 +20,12 @@ package org.apache.spark.mllib.linalg import java.util.Arrays import breeze.linalg.{Matrix => BM, DenseMatrix => BDM, CSCMatrix => BSM} +import org.apache.spark.util.Utils import org.apache.spark.util.random.XORShiftRandom +import scala.collection.mutable.ArrayBuffer + /** * Trait for a local matrix. */ @@ -82,6 +85,12 @@ sealed trait Matrix extends Serializable { /** A human readable representation of the matrix */ override def toString: String = toBreeze.toString() + + /** Map the values of this matrix using a function. Generates a new matrix. */ + private[mllib] def map(f: Double => Double): Matrix + + /** Update all the values of this matrix using the function f. Performed in-place. */ + private[mllib] def update(f: Double => Double): Matrix } /** @@ -125,6 +134,119 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) } override def copy = new DenseMatrix(numRows, numCols, values.clone()) + + private[mllib] def map(f: Double => Double) = new DenseMatrix(numRows, numCols, values.map(f)) + + private[mllib] def update(f: Double => Double): DenseMatrix = { + val len = values.length + var i = 0 + while (i < len) { + values(i) = f(values(i)) + i += 1 + } + this + } +} + +/** + * Factory methods for [[org.apache.spark.mllib.linalg.DenseMatrix]]. + */ +object DenseMatrix { + + /** + * Generate a `DenseMatrix` consisting of zeros. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `DenseMatrix` with size `numRows` x `numCols` and values of zeros + */ + def zeros(numRows: Int, numCols: Int): DenseMatrix = + new DenseMatrix(numRows, numCols, new Array[Double](numRows * numCols)) + + /** + * Generate a `DenseMatrix` consisting of ones. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `DenseMatrix` with size `numRows` x `numCols` and values of ones + */ + def ones(numRows: Int, numCols: Int): DenseMatrix = + new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(1.0)) + + /** + * Generate an Identity Matrix in `DenseMatrix` format. + * @param n number of rows and columns of the matrix + * @return `DenseMatrix` with size `n` x `n` and values of ones on the diagonal + */ + def eye(n: Int): DenseMatrix = { + val identity = DenseMatrix.zeros(n, n) + var i = 0 + while (i < n){ + identity.update(i, i, 1.0) + i += 1 + } + identity + } + + /** + * Generate a `DenseMatrix` consisting of i.i.d. uniform random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param seed the seed seed for the random number generator + * @return `DenseMatrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def rand(numRows: Int, numCols: Int, seed: Long): DenseMatrix = { + val rand = new XORShiftRandom(seed) + new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rand.nextDouble())) + } + + /** + * Generate a `DenseMatrix` consisting of i.i.d. uniform random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `DenseMatrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def rand(numRows: Int, numCols: Int): DenseMatrix = { + rand(numRows, numCols, Utils.random.nextLong()) + } + + /** + * Generate a `DenseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param seed the seed seed for the random number generator + * @return `DenseMatrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def randn(numRows: Int, numCols: Int, seed: Long): DenseMatrix = { + val rand = new XORShiftRandom(seed) + new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rand.nextGaussian())) + } + + /** + * Generate a `DenseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `DenseMatrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def randn(numRows: Int, numCols: Int): DenseMatrix = { + randn(numRows, numCols, Utils.random.nextLong()) + } + + /** + * Generate a diagonal matrix in `DenseMatrix` format from the supplied values. + * @param vector a `Vector` that will form the values on the diagonal of the matrix + * @return Square `DenseMatrix` with size `values.length` x `values.length` and `values` + * on the diagonal + */ + def diag(vector: Vector): DenseMatrix = { + val n = vector.size + val matrix = DenseMatrix.eye(n) + val values = vector.toArray + var i = 0 + while (i < n) { + matrix.update(i, i, values(i)) + i += 1 + } + matrix + } } /** @@ -199,6 +321,197 @@ class SparseMatrix( } override def copy = new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.clone()) + + private[mllib] def map(f: Double => Double) = + new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.map(f)) + + private[mllib] def update(f: Double => Double): SparseMatrix = { + val len = values.length + var i = 0 + while (i < len) { + values(i) = f(values(i)) + i += 1 + } + this + } +} + +/** + * Factory methods for [[org.apache.spark.mllib.linalg.SparseMatrix]]. + */ +object SparseMatrix { + + /** + * Generate an Identity Matrix in `SparseMatrix` format. + * @param n number of rows and columns of the matrix + * @return `SparseMatrix` with size `n` x `n` and values of ones on the diagonal + */ + def speye(n: Int): SparseMatrix = { + new SparseMatrix(n, n, (0 to n).toArray, (0 until n).toArray, Array.fill(n)(1.0)) + } + + /** Generates a SparseMatrix given an Array[Double] of size numRows * numCols. The number of + * non-zeros in `raw` is provided for efficiency. */ + private def genRand(numRows: Int, numCols: Int, raw: Array[Double], nonZero: Int): SparseMatrix = { + val sparseA: ArrayBuffer[Double] = new ArrayBuffer(nonZero) + + val sCols: ArrayBuffer[Int] = new ArrayBuffer(numCols + 1) + val sRows: ArrayBuffer[Int] = new ArrayBuffer(nonZero) + + var i = 0 + var nnz = 0 + var lastCol = -1 + + raw.foreach { v => + val r = i % numRows + val c = (i - r) / numRows + if ( v != 0.0) { + sRows.append(r) + sparseA.append(v) + while (c != lastCol){ + sCols.append(nnz) + lastCol += 1 + } + nnz += 1 + } + i += 1 + } + sCols.append(sparseA.length) + new SparseMatrix(numRows, numCols, sCols.toArray, sRows.toArray, sparseA.toArray) + } + + /** + * Generate a `SparseMatrix` consisting of i.i.d. uniform random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @param seed the seed for the random generator + * @return `SparseMatrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def sprand( + numRows: Int, + numCols: Int, + density: Double, + seed: Long): SparseMatrix = { + + require(density > 0.0 && density < 1.0, "density must be a double in the range " + + s"0.0 < d < 1.0. Currently, density: $density") + val rand = new XORShiftRandom(seed) + val length = numRows * numCols + val rawA = Array.fill(length)(0.0) + var nnz = 0 + for (i <- 0 until length) { + val p = rand.nextDouble() + if (p < density) { + rawA.update(i, rand.nextDouble()) + nnz += 1 + } + } + genRand(numRows, numCols, rawA, nnz) + } + + /** + * Generate a `SparseMatrix` consisting of i.i.d. uniform random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @return `SparseMatrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def sprand(numRows: Int, numCols: Int, density: Double): SparseMatrix = { + sprand(numRows, numCols, density, Utils.random.nextLong()) + } + + /** + * Generate a `SparseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @param seed the seed for the random generator + * @return `SparseMatrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def sprandn( + numRows: Int, + numCols: Int, + density: Double, + seed: Long): SparseMatrix = { + + require(density > 0.0 && density < 1.0, "density must be a double in the range " + + s"0.0 < d < 1.0. Currently, density: $density") + val rand = new XORShiftRandom(seed) + val length = numRows * numCols + val rawA = Array.fill(length)(0.0) + var nnz = 0 + for (i <- 0 until length) { + val p = rand.nextDouble() + if (p < density) { + rawA.update(i, rand.nextGaussian()) + nnz += 1 + } + } + genRand(numRows, numCols, rawA, nnz) + } + + /** + * Generate a `SparseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @return `SparseMatrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def sprandn(numRows: Int, numCols: Int, density: Double): SparseMatrix = { + sprandn(numRows, numCols, density, Utils.random.nextLong()) + } + + /** + * Generate a diagonal matrix in `DenseMatrix` format from the supplied values. + * @param vector a `Vector` that will form the values on the diagonal of the matrix + * @return Square `SparseMatrix` with size `values.length` x `values.length` and non-zero `values` + * on the diagonal + */ + def diag(vector: Vector): SparseMatrix = { + val n = vector.size + vector match { + case sVec: SparseVector => + val rows = sVec.indices + val values = sVec.values + var i = 0 + var lastCol = -1 + val colPtrs = new ArrayBuffer[Int](n) + rows.foreach { r => + while (r != lastCol) { + colPtrs.append(i) + lastCol += 1 + } + i += 1 + } + colPtrs.append(n) + new SparseMatrix(n, n, colPtrs.toArray, rows, values) + case dVec: DenseVector => + val values = dVec.values + var i = 0 + var nnz = 0 + val sVals = values.filter( v => v != 0.0) + var lastCol = -1 + val colPtrs = new ArrayBuffer[Int](n + 1) + val sRows = new ArrayBuffer[Int](sVals.length) + values.foreach { v => + if (v != 0.0) { + sRows.append(i) + while (lastCol != i) { + colPtrs.append(nnz) + lastCol += 1 + } + nnz += 1 + } + i += 1 + } + while (lastCol != i) { + colPtrs.append(nnz) + lastCol += 1 + } + new SparseMatrix(n, n, colPtrs.toArray, sRows.toArray, sVals) + } + } } /** @@ -260,8 +573,7 @@ object Matrices { * @param numCols number of columns of the matrix * @return `DenseMatrix` with size `numRows` x `numCols` and values of zeros */ - def zeros(numRows: Int, numCols: Int): Matrix = - new DenseMatrix(numRows, numCols, new Array[Double](numRows * numCols)) + def zeros(numRows: Int, numCols: Int): Matrix = DenseMatrix.zeros(numRows, numCols) /** * Generate a `DenseMatrix` consisting of ones. @@ -269,23 +581,31 @@ object Matrices { * @param numCols number of columns of the matrix * @return `DenseMatrix` with size `numRows` x `numCols` and values of ones */ - def ones(numRows: Int, numCols: Int): Matrix = - new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(1.0)) + def ones(numRows: Int, numCols: Int): Matrix = DenseMatrix.ones(numRows, numCols) /** - * Generate an Identity Matrix in `DenseMatrix` format. + * Generate a dense Identity Matrix in `Matrix` format. * @param n number of rows and columns of the matrix * @return `DenseMatrix` with size `n` x `n` and values of ones on the diagonal */ - def eye(n: Int): Matrix = { - val identity = Matrices.zeros(n, n) - var i = 0 - while (i < n){ - identity.update(i, i, 1.0) - i += 1 - } - identity - } + def eye(n: Int): Matrix = DenseMatrix.eye(n) + + /** + * Generate a sparse Identity Matrix in `Matrix` format. + * @param n number of rows and columns of the matrix + * @return `SparseMatrix` with size `n` x `n` and values of ones on the diagonal + */ + def speye(n: Int): Matrix = SparseMatrix.speye(n) + + /** + * Generate a dense `Matrix` consisting of i.i.d. uniform random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param seed the seed for the random generator + * @return `DenseMatrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def rand(numRows: Int, numCols: Int, seed: Long): Matrix = + DenseMatrix.rand(numRows, numCols, seed) /** * Generate a `DenseMatrix` consisting of i.i.d. uniform random numbers. @@ -293,21 +613,67 @@ object Matrices { * @param numCols number of columns of the matrix * @return `DenseMatrix` with size `numRows` x `numCols` and values in U(0, 1) */ - def rand(numRows: Int, numCols: Int): Matrix = { - val rand = new XORShiftRandom - new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rand.nextDouble())) - } + def rand(numRows: Int, numCols: Int): Matrix = DenseMatrix.rand(numRows, numCols) + + /** + * Generate a `SparseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @param seed the seed for the random generator + * @return `Matrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def sprand(numRows: Int, numCols: Int, density: Double, seed: Long): Matrix = + SparseMatrix.sprand(numRows, numCols, density, seed) + + /** + * Generate a `SparseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @return `Matrix` with size `numRows` x `numCols` and values in U(0, 1) + */ + def sprand(numRows: Int, numCols: Int, density: Double): Matrix = + SparseMatrix.sprand(numRows, numCols, density) /** * Generate a `DenseMatrix` consisting of i.i.d. gaussian random numbers. * @param numRows number of rows of the matrix * @param numCols number of columns of the matrix + * @param seed the seed for the random generator * @return `DenseMatrix` with size `numRows` x `numCols` and values in N(0, 1) */ - def randn(numRows: Int, numCols: Int): Matrix = { - val rand = new XORShiftRandom - new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rand.nextGaussian())) - } + def randn(numRows: Int, numCols: Int, seed: Long): Matrix = + DenseMatrix.randn(numRows, numCols, seed) + + /** + * Generate a `DenseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @return `DenseMatrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def randn(numRows: Int, numCols: Int): Matrix = DenseMatrix.randn(numRows, numCols) + + /** + * Generate a `SparseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @param seed the seed for the random generator + * @return `Matrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def sprandn(numRows: Int, numCols: Int, density: Double, seed: Long): Matrix = + SparseMatrix.sprandn(numRows, numCols, density, seed) + + /** + * Generate a `SparseMatrix` consisting of i.i.d. gaussian random numbers. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param density the desired density for the matrix + * @return `Matrix` with size `numRows` x `numCols` and values in N(0, 1) + */ + def sprandn(numRows: Int, numCols: Int, density: Double): Matrix = + SparseMatrix.sprandn(numRows, numCols, density) /** * Generate a diagonal matrix in `DenseMatrix` format from the supplied values. @@ -315,15 +681,145 @@ object Matrices { * @return Square `DenseMatrix` with size `values.length` x `values.length` and `values` * on the diagonal */ - def diag(vector: Vector): Matrix = { - val n = vector.size - val matrix = Matrices.eye(n) - val values = vector.toArray - var i = 0 - while (i < n) { - matrix.update(i, i, values(i)) - i += 1 + def diag(vector: Vector): Matrix = DenseMatrix.diag(vector) + + /** + * Horizontally concatenate a sequence of matrices. The returned matrix will be in the format + * the matrices are supplied in. Supplying a mix of dense and sparse matrices will result in + * a dense matrix. + * @param matrices sequence of matrices + * @return a single `Matrix` composed of the matrices that were horizontally concatenated + */ + private[mllib] def horzCat(matrices: Seq[Matrix]): Matrix = { + if (matrices.size == 1) { + return matrices(0) + } + val numRows = matrices(0).numRows + var rowsMatch = true + var isDense = false + var isSparse = false + var numCols = 0 + matrices.foreach { mat => + if (numRows != mat.numRows) rowsMatch = false + mat match { + case sparse: SparseMatrix => isSparse = true + case dense: DenseMatrix => isDense = true + } + numCols += mat.numCols + } + require(rowsMatch, "The number of rows of the matrices in this sequence, don't match!") + + if (isSparse && !isDense) { + val allColPtrs: Array[(Int, Int)] = Array((0, 0)) ++ + matrices.zipWithIndex.flatMap { case (mat, ind) => + val ptr = mat.asInstanceOf[SparseMatrix].colPtrs + ptr.slice(1, ptr.length).map(p => (ind, p)) + } + var counter = 0 + var lastIndex = 0 + var lastPtr = 0 + val adjustedPtrs = allColPtrs.map { case (ind, p) => + if (ind != lastIndex) { + counter += lastPtr + lastIndex = ind + } + lastPtr = p + counter + p + } + new SparseMatrix(numRows, numCols, adjustedPtrs, + matrices.flatMap(_.asInstanceOf[SparseMatrix].rowIndices).toArray, + matrices.flatMap(_.asInstanceOf[SparseMatrix].values).toArray) + } else if (!isSparse && !isDense) { + throw new IllegalArgumentException("The supplied matrices are neither in SparseMatrix or" + + " DenseMatrix format!") + }else { + new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray).toArray) + } + } + + /** + * Vertically concatenate a sequence of matrices. The returned matrix will be in the format + * the matrices are supplied in. Supplying a mix of dense and sparse matrices will result in + * a dense matrix. + * @param matrices sequence of matrices + * @return a single `Matrix` composed of the matrices that were horizontally concatenated + */ + private[mllib] def vertCat(matrices: Seq[Matrix]): Matrix = { + if (matrices.size == 1) { + return matrices(0) + } + val numCols = matrices(0).numCols + var colsMatch = true + var isDense = false + var isSparse = false + var numRows = 0 + var valsLength = 0 + matrices.foreach { mat => + if (numCols != mat.numCols) colsMatch = false + mat match { + case sparse: SparseMatrix => + isSparse = true + valsLength += sparse.values.length + case dense: DenseMatrix => + isDense = true + valsLength += dense.values.length + } + numRows += mat.numRows + + } + require(colsMatch, "The number of rows of the matrices in this sequence, don't match!") + + if (isSparse && !isDense) { + val matMap = matrices.zipWithIndex.map(d => (d._2, d._1.asInstanceOf[SparseMatrix])).toMap + // (matrixInd, colInd, colStart, colEnd, numRows) + val allColPtrs: Seq[(Int, Int, Int, Int, Int)] = + matMap.flatMap { case (ind, mat) => + val ptr = mat.colPtrs + var colStart = 0 + var j = 0 + ptr.slice(1, ptr.length).map { p => + j += 1 + val oldColStart = colStart + colStart = p + (j - 1, ind, oldColStart, p, mat.numRows) + } + }.toSeq + val values = new ArrayBuffer[Double](valsLength) + val rowInd = new ArrayBuffer[Int](valsLength) + val newColPtrs = new Array[Int](numCols) + + // group metadata by column index and then sort in increasing order of column index + allColPtrs.groupBy(_._1).toArray.sortBy(_._1).foreach { case (colInd, data) => + // then sort by matrix index + val sortedPtrs = data.sortBy(_._1) + var startRow = 0 + sortedPtrs.foreach { case (colIdx, matrixInd, colStart, colEnd, nRows) => + val selectedMatrix = matMap(matrixInd) + val selectedValues = selectedMatrix.values.slice(colStart, colEnd) + val selectedRowIdx = selectedMatrix.rowIndices.slice(colStart, colEnd) + val len = selectedValues.length + newColPtrs(colIdx) += len + var i = 0 + while (i < len) { + values.append(selectedValues(i)) + rowInd.append(selectedRowIdx(i) + startRow) + i += 1 + } + startRow += nRows + } + } + val adjustedPtrs = newColPtrs.scanLeft(0)(_ + _) + new SparseMatrix(numRows, numCols, adjustedPtrs, rowInd.toArray, values.toArray) + } else if (!isSparse && !isDense) { + throw new IllegalArgumentException("The supplied matrices are neither in SparseMatrix or" + + " DenseMatrix format!") + }else { + val matData = matrices.zipWithIndex.flatMap { case (mat, ind) => + val values = mat.toArray + for (j <- 0 until numCols) yield (j, ind, + values.slice(j * mat.numRows, (j + 1) * mat.numRows)) + }.sortBy(x => (x._1, x._2)) + new DenseMatrix(numRows, numCols, matData.flatMap(_._3).toArray) } - matrix } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index 5f8b8c4b72697..2793e9aaef86d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -112,4 +112,113 @@ class MatricesSuite extends FunSuite { assert(sparseMat(0, 1) === 10.0) assert(sparseMat.values(2) === 10.0) } + + test("map, update") { + val m = 3 + val n = 2 + val values = Array(1.0, 2.0, 4.0, 5.0) + val allValues = Array(1.0, 2.0, 0.0, 0.0, 4.0, 5.0) + val colPtrs = Array(0, 2, 4) + val rowIndices = Array(0, 1, 1, 2) + + val spMat1 = new SparseMatrix(m, n, colPtrs, rowIndices, values) + val deMat1 = new DenseMatrix(m, n, allValues) + val deMat2 = deMat1.map(_ * 2) + val spMat2 = spMat1.map(_ * 2) + deMat1.update(_ * 2) + spMat1.update(_ * 2) + + assert(spMat1.toArray === spMat2.toArray) + assert(deMat1.toArray === deMat2.toArray) + } + + test("horzCat, vertCat, eye, speye") { + val m = 3 + val n = 2 + val values = Array(1.0, 2.0, 4.0, 5.0) + val allValues = Array(1.0, 2.0, 0.0, 0.0, 4.0, 5.0) + val colPtrs = Array(0, 2, 4) + val rowIndices = Array(0, 1, 1, 2) + + val spMat1 = new SparseMatrix(m, n, colPtrs, rowIndices, values) + val deMat1 = new DenseMatrix(m, n, allValues) + val deMat2 = Matrices.eye(3) + val spMat2 = Matrices.speye(3) + val deMat3 = Matrices.eye(2) + val spMat3 = Matrices.speye(2) + + val spHorz = Matrices.horzCat(Seq(spMat1, spMat2)) + val deHorz1 = Matrices.horzCat(Seq(deMat1, deMat2)) + val deHorz2 = Matrices.horzCat(Seq(spMat1, deMat2)) + val deHorz3 = Matrices.horzCat(Seq(deMat1, spMat2)) + + assert(deHorz1.numRows === 3) + assert(deHorz2.numRows === 3) + assert(deHorz3.numRows === 3) + assert(spHorz.numRows === 3) + assert(deHorz1.numCols === 5) + assert(deHorz2.numCols === 5) + assert(deHorz3.numCols === 5) + assert(spHorz.numCols === 5) + + assert(deHorz1 === deHorz2) + assert(deHorz2 === deHorz3) + assert(spHorz(0, 0) === 1.0) + assert(spHorz(2, 1) === 5.0) + assert(spHorz(0, 2) === 1.0) + assert(spHorz(1, 2) === 0.0) + assert(spHorz(1, 3) === 1.0) + assert(spHorz(2, 4) === 1.0) + assert(spHorz(1, 4) === 0.0) + assert(deHorz1(0, 0) === 1.0) + assert(deHorz1(2, 1) === 5.0) + assert(deHorz1(0, 2) === 1.0) + assert(deHorz1(1, 2) === 0.0) + assert(deHorz1(1, 3) === 1.0) + assert(deHorz1(2, 4) === 1.0) + assert(deHorz1(1, 4) === 0.0) + + intercept[IllegalArgumentException] { + Matrices.horzCat(Seq(spMat1, spMat3)) + } + + intercept[IllegalArgumentException] { + Matrices.horzCat(Seq(deMat1, spMat3)) + } + + val spVert = Matrices.vertCat(Seq(spMat1, spMat3)) + val deVert1 = Matrices.vertCat(Seq(deMat1, deMat3)) + val deVert2 = Matrices.vertCat(Seq(spMat1, deMat3)) + val deVert3 = Matrices.vertCat(Seq(deMat1, spMat3)) + + assert(deVert1.numRows === 5) + assert(deVert2.numRows === 5) + assert(deVert3.numRows === 5) + assert(spVert.numRows === 5) + assert(deVert1.numCols === 2) + assert(deVert2.numCols === 2) + assert(deVert3.numCols === 2) + assert(spVert.numCols === 2) + + assert(deVert1 === deVert2) + assert(deVert2 === deVert3) + assert(spVert(0, 0) === 1.0) + assert(spVert(2, 1) === 5.0) + assert(spVert(3, 0) === 1.0) + assert(spVert(3, 1) === 0.0) + assert(spVert(4, 1) === 1.0) + assert(deVert1(0, 0) === 1.0) + assert(deVert1(2, 1) === 5.0) + assert(deVert1(3, 0) === 1.0) + assert(deVert1(3, 1) === 0.0) + assert(deVert1(4, 1) === 1.0) + + intercept[IllegalArgumentException] { + Matrices.vertCat(Seq(spMat1, spMat2)) + } + + intercept[IllegalArgumentException] { + Matrices.vertCat(Seq(deMat1, spMat2)) + } + } } From 83dfe37e6ea541899bb1ae76443395a4e94cc17d Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 17 Nov 2014 11:46:32 -0800 Subject: [PATCH 02/17] [SPARK-4409] Scalastyle error fixed --- .../org/apache/spark/mllib/linalg/Matrices.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index d9298cacd7d99..1952493498c6b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -352,9 +352,12 @@ object SparseMatrix { /** Generates a SparseMatrix given an Array[Double] of size numRows * numCols. The number of * non-zeros in `raw` is provided for efficiency. */ - private def genRand(numRows: Int, numCols: Int, raw: Array[Double], nonZero: Int): SparseMatrix = { + private def genRand( + numRows: Int, + numCols: Int, + raw: Array[Double], + nonZero: Int): SparseMatrix = { val sparseA: ArrayBuffer[Double] = new ArrayBuffer(nonZero) - val sCols: ArrayBuffer[Int] = new ArrayBuffer(numCols + 1) val sRows: ArrayBuffer[Int] = new ArrayBuffer(nonZero) @@ -393,7 +396,6 @@ object SparseMatrix { numCols: Int, density: Double, seed: Long): SparseMatrix = { - require(density > 0.0 && density < 1.0, "density must be a double in the range " + s"0.0 < d < 1.0. Currently, density: $density") val rand = new XORShiftRandom(seed) @@ -434,7 +436,6 @@ object SparseMatrix { numCols: Int, density: Double, seed: Long): SparseMatrix = { - require(density > 0.0 && density < 1.0, "density must be a double in the range " + s"0.0 < d < 1.0. Currently, density: $density") val rand = new XORShiftRandom(seed) @@ -465,8 +466,8 @@ object SparseMatrix { /** * Generate a diagonal matrix in `DenseMatrix` format from the supplied values. * @param vector a `Vector` that will form the values on the diagonal of the matrix - * @return Square `SparseMatrix` with size `values.length` x `values.length` and non-zero `values` - * on the diagonal + * @return Square `SparseMatrix` with size `values.length` x `values.length` and non-zero + * `values` on the diagonal */ def diag(vector: Vector): SparseMatrix = { val n = vector.size From d662f9d963c21aca720bab87a8279a938e1d924e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 17 Nov 2014 11:49:14 -0800 Subject: [PATCH 03/17] [SPARK-4409] Modified according to remote repo --- .../main/scala/org/apache/spark/mllib/linalg/Matrices.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 1952493498c6b..7e448b9e3af6f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -17,13 +17,12 @@ package org.apache.spark.mllib.linalg -import java.util.Arrays - import breeze.linalg.{Matrix => BM, DenseMatrix => BDM, CSCMatrix => BSM} -import org.apache.spark.util.Utils import org.apache.spark.util.random.XORShiftRandom +import org.apache.spark.util.Utils +import java.util.Arrays import scala.collection.mutable.ArrayBuffer /** From c75f3cdec438042c10e31009dee87a14fdce4053 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 25 Nov 2014 18:03:45 -0800 Subject: [PATCH 04/17] [SPARK-4409] Added JavaAPI Tests, and fixed a couple of bugs --- .../apache/spark/mllib/linalg/Matrices.scala | 29 ++-- .../spark/mllib/linalg/JavaMatricesSuite.java | 133 ++++++++++++++++++ .../spark/mllib/linalg/MatricesSuite.scala | 26 ++-- 3 files changed, 163 insertions(+), 25 deletions(-) create mode 100644 mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 7e448b9e3af6f..a6f3343f56b86 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -363,7 +363,6 @@ object SparseMatrix { var i = 0 var nnz = 0 var lastCol = -1 - raw.foreach { v => val r = i % numRows val c = (i - r) / numRows @@ -378,7 +377,10 @@ object SparseMatrix { } i += 1 } - sCols.append(sparseA.length) + while (numCols > lastCol){ + sCols.append(sparseA.length) + lastCol += 1 + } new SparseMatrix(numRows, numCols, sCols.toArray, sRows.toArray, sparseA.toArray) } @@ -399,11 +401,11 @@ object SparseMatrix { s"0.0 < d < 1.0. Currently, density: $density") val rand = new XORShiftRandom(seed) val length = numRows * numCols - val rawA = Array.fill(length)(0.0) + val rawA = new Array[Double](length) var nnz = 0 for (i <- 0 until length) { val p = rand.nextDouble() - if (p < density) { + if (p <= density) { rawA.update(i, rand.nextDouble()) nnz += 1 } @@ -439,11 +441,11 @@ object SparseMatrix { s"0.0 < d < 1.0. Currently, density: $density") val rand = new XORShiftRandom(seed) val length = numRows * numCols - val rawA = Array.fill(length)(0.0) + val rawA = new Array[Double](length) var nnz = 0 for (i <- 0 until length) { val p = rand.nextDouble() - if (p < density) { + if (p <= density) { rawA.update(i, rand.nextGaussian()) nnz += 1 } @@ -476,7 +478,7 @@ object SparseMatrix { val values = sVec.values var i = 0 var lastCol = -1 - val colPtrs = new ArrayBuffer[Int](n) + val colPtrs = new ArrayBuffer[Int](n + 1) rows.foreach { r => while (r != lastCol) { colPtrs.append(i) @@ -484,13 +486,16 @@ object SparseMatrix { } i += 1 } - colPtrs.append(n) + while (n > lastCol) { + colPtrs.append(i) + lastCol += 1 + } new SparseMatrix(n, n, colPtrs.toArray, rows, values) case dVec: DenseVector => val values = dVec.values var i = 0 var nnz = 0 - val sVals = values.filter( v => v != 0.0) + val sVals = values.filter(v => v != 0.0) var lastCol = -1 val colPtrs = new ArrayBuffer[Int](n + 1) val sRows = new ArrayBuffer[Int](sVals.length) @@ -687,10 +692,10 @@ object Matrices { * Horizontally concatenate a sequence of matrices. The returned matrix will be in the format * the matrices are supplied in. Supplying a mix of dense and sparse matrices will result in * a dense matrix. - * @param matrices sequence of matrices + * @param matrices array of matrices * @return a single `Matrix` composed of the matrices that were horizontally concatenated */ - private[mllib] def horzCat(matrices: Seq[Matrix]): Matrix = { + def horzcat(matrices: Array[Matrix]): Matrix = { if (matrices.size == 1) { return matrices(0) } @@ -744,7 +749,7 @@ object Matrices { * @param matrices sequence of matrices * @return a single `Matrix` composed of the matrices that were horizontally concatenated */ - private[mllib] def vertCat(matrices: Seq[Matrix]): Matrix = { + def vertcat(matrices: Array[Matrix]): Matrix = { if (matrices.size == 1) { return matrices(0) } diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java new file mode 100644 index 0000000000000..e938071d5c3fb --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg; + +import static org.junit.Assert.*; +import org.junit.Test; + +import java.io.Serializable; + +public class JavaMatricesSuite implements Serializable { + + @Test + public void randMatrixConstruction() { + Matrix r = Matrices.rand(3, 4, 24); + DenseMatrix dr = DenseMatrix.rand(3, 4, 24); + assertArrayEquals(r.toArray(), dr.toArray(), 0.0); + + Matrix rn = Matrices.randn(3, 4, 24); + DenseMatrix drn = DenseMatrix.randn(3, 4, 24); + assertArrayEquals(rn.toArray(), drn.toArray(), 0.0); + + Matrix s = Matrices.sprand(3, 4, 0.5, 24); + SparseMatrix sr = SparseMatrix.sprand(3, 4, 0.5, 24); + assertArrayEquals(s.toArray(), sr.toArray(), 0.0); + + Matrix sn = Matrices.sprandn(3, 4, 0.5, 24); + SparseMatrix srn = SparseMatrix.sprandn(3, 4, 0.5, 24); + assertArrayEquals(sn.toArray(), srn.toArray(), 0.0); + } + + @Test + public void identityMatrixConstruction() { + Matrix r = Matrices.eye(2); + DenseMatrix dr = DenseMatrix.eye(2); + SparseMatrix sr = SparseMatrix.speye(2); + assertArrayEquals(r.toArray(), dr.toArray(), 0.0); + assertArrayEquals(sr.toArray(), dr.toArray(), 0.0); + assertArrayEquals(r.toArray(), new double[]{1.0, 0.0, 0.0, 1.0}, 0.0); + } + + @Test + public void diagonalMatrixConstruction() { + Vector v = Vectors.dense(1.0, 0.0, 2.0); + Vector sv = Vectors.sparse(3, new int[]{0, 2}, new double[]{1.0, 2.0}); + + Matrix m = Matrices.diag(v); + Matrix sm = Matrices.diag(sv); + DenseMatrix d = DenseMatrix.diag(v); + DenseMatrix sd = DenseMatrix.diag(sv); + SparseMatrix s = SparseMatrix.diag(v); + SparseMatrix ss = SparseMatrix.diag(sv); + + assertArrayEquals(m.toArray(), sm.toArray(), 0.0); + assertArrayEquals(d.toArray(), sm.toArray(), 0.0); + assertArrayEquals(d.toArray(), sd.toArray(), 0.0); + assertArrayEquals(sd.toArray(), s.toArray(), 0.0); + assertArrayEquals(s.toArray(), ss.toArray(), 0.0); + assertArrayEquals(s.values(), ss.values(), 0.0); + assert(s.values().length == 2); + assert(ss.values().length == 2); + assert(s.colPtrs().length == 2); + assert(ss.colPtrs().length == 2); + } + + @Test + public void zerosMatrixConstruction() { + Matrix z = Matrices.zeros(2, 2); + Matrix one = Matrices.ones(2, 2); + DenseMatrix dz = DenseMatrix.zeros(2, 2); + DenseMatrix done = DenseMatrix.ones(2, 2); + + assertArrayEquals(z.toArray(), new double[]{0.0, 0.0, 0.0, 0.0}, 0.0); + assertArrayEquals(dz.toArray(), new double[]{0.0, 0.0, 0.0, 0.0}, 0.0); + assertArrayEquals(one.toArray(), new double[]{1.0, 1.0, 1.0, 1.0}, 0.0); + assertArrayEquals(done.toArray(), new double[]{1.0, 1.0, 1.0, 1.0}, 0.0); + } + + @Test + public void concatenateMatrices() { + int m = 3; + int n = 2; + + SparseMatrix spMat1 = SparseMatrix.sprand(m, n, 0.5, 42); + DenseMatrix deMat1 = DenseMatrix.rand(m, n, 42); + Matrix deMat2 = Matrices.eye(3); + Matrix spMat2 = Matrices.speye(3); + Matrix deMat3 = Matrices.eye(2); + Matrix spMat3 = Matrices.speye(2); + + Matrix spHorz = Matrices.horzcat(new Matrix[]{spMat1, spMat2}); + Matrix deHorz1 = Matrices.horzcat(new Matrix[]{deMat1, deMat2}); + Matrix deHorz2 = Matrices.horzcat(new Matrix[]{spMat1, deMat2}); + Matrix deHorz3 = Matrices.horzcat(new Matrix[]{deMat1, spMat2}); + + assert(deHorz1.numRows() == 3); + assert(deHorz2.numRows() == 3); + assert(deHorz3.numRows() == 3); + assert(spHorz.numRows() == 3); + assert(deHorz1.numCols() == 5); + assert(deHorz2.numCols() == 5); + assert(deHorz3.numCols() == 5); + assert(spHorz.numCols() == 5); + + Matrix spVert = Matrices.vertcat(new Matrix[]{spMat1, spMat3}); + Matrix deVert1 = Matrices.vertcat(new Matrix[]{deMat1, deMat3}); + Matrix deVert2 = Matrices.vertcat(new Matrix[]{spMat1, deMat3}); + Matrix deVert3 = Matrices.vertcat(new Matrix[]{deMat1, spMat3}); + + assert(deVert1.numRows() == 5); + assert(deVert2.numRows() == 5); + assert(deVert3.numRows() == 5); + assert(spVert.numRows() == 5); + assert(deVert1.numCols() == 2); + assert(deVert2.numCols() == 2); + assert(deVert3.numCols() == 2); + assert(spVert.numCols() == 2); + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index 2793e9aaef86d..ef6c4a3974bf3 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -132,7 +132,7 @@ class MatricesSuite extends FunSuite { assert(deMat1.toArray === deMat2.toArray) } - test("horzCat, vertCat, eye, speye") { + test("horzcat, vertcat, eye, speye") { val m = 3 val n = 2 val values = Array(1.0, 2.0, 4.0, 5.0) @@ -147,10 +147,10 @@ class MatricesSuite extends FunSuite { val deMat3 = Matrices.eye(2) val spMat3 = Matrices.speye(2) - val spHorz = Matrices.horzCat(Seq(spMat1, spMat2)) - val deHorz1 = Matrices.horzCat(Seq(deMat1, deMat2)) - val deHorz2 = Matrices.horzCat(Seq(spMat1, deMat2)) - val deHorz3 = Matrices.horzCat(Seq(deMat1, spMat2)) + val spHorz = Matrices.horzcat(Array(spMat1, spMat2)) + val deHorz1 = Matrices.horzcat(Array(deMat1, deMat2)) + val deHorz2 = Matrices.horzcat(Array(spMat1, deMat2)) + val deHorz3 = Matrices.horzcat(Array(deMat1, spMat2)) assert(deHorz1.numRows === 3) assert(deHorz2.numRows === 3) @@ -179,17 +179,17 @@ class MatricesSuite extends FunSuite { assert(deHorz1(1, 4) === 0.0) intercept[IllegalArgumentException] { - Matrices.horzCat(Seq(spMat1, spMat3)) + Matrices.horzcat(Array(spMat1, spMat3)) } intercept[IllegalArgumentException] { - Matrices.horzCat(Seq(deMat1, spMat3)) + Matrices.horzcat(Array(deMat1, spMat3)) } - val spVert = Matrices.vertCat(Seq(spMat1, spMat3)) - val deVert1 = Matrices.vertCat(Seq(deMat1, deMat3)) - val deVert2 = Matrices.vertCat(Seq(spMat1, deMat3)) - val deVert3 = Matrices.vertCat(Seq(deMat1, spMat3)) + val spVert = Matrices.vertcat(Array(spMat1, spMat3)) + val deVert1 = Matrices.vertcat(Array(deMat1, deMat3)) + val deVert2 = Matrices.vertcat(Array(spMat1, deMat3)) + val deVert3 = Matrices.vertcat(Array(deMat1, spMat3)) assert(deVert1.numRows === 5) assert(deVert2.numRows === 5) @@ -214,11 +214,11 @@ class MatricesSuite extends FunSuite { assert(deVert1(4, 1) === 1.0) intercept[IllegalArgumentException] { - Matrices.vertCat(Seq(spMat1, spMat2)) + Matrices.vertcat(Array(spMat1, spMat2)) } intercept[IllegalArgumentException] { - Matrices.vertCat(Seq(deMat1, spMat2)) + Matrices.vertcat(Array(deMat1, spMat2)) } } } From a8120d2a83720b621b36942add3a98aa4b96bcc3 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 26 Nov 2014 09:26:16 -0800 Subject: [PATCH 05/17] [SPARK-4409] Finished updates to API according to SPARK-4614 --- .../apache/spark/mllib/linalg/Matrices.scala | 3 -- .../spark/mllib/linalg/JavaMatricesSuite.java | 31 +++++++++++++------ 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 0fa2f600721ed..e024eb05af6a6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -19,9 +19,6 @@ package org.apache.spark.mllib.linalg import breeze.linalg.{Matrix => BM, DenseMatrix => BDM, CSCMatrix => BSM} -import org.apache.spark.util.random.XORShiftRandom -import org.apache.spark.util.Utils - import java.util.{Random, Arrays} import scala.collection.mutable.ArrayBuffer diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java index e938071d5c3fb..a75f9a6141e56 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java @@ -21,25 +21,34 @@ import org.junit.Test; import java.io.Serializable; +import java.util.Random; public class JavaMatricesSuite implements Serializable { @Test public void randMatrixConstruction() { - Matrix r = Matrices.rand(3, 4, 24); - DenseMatrix dr = DenseMatrix.rand(3, 4, 24); + Random rng = new Random(24); + Matrix r = Matrices.rand(3, 4, rng); + rng.setSeed(24); + DenseMatrix dr = DenseMatrix.rand(3, 4, rng); assertArrayEquals(r.toArray(), dr.toArray(), 0.0); - Matrix rn = Matrices.randn(3, 4, 24); - DenseMatrix drn = DenseMatrix.randn(3, 4, 24); + rng.setSeed(24); + Matrix rn = Matrices.randn(3, 4, rng); + rng.setSeed(24); + DenseMatrix drn = DenseMatrix.randn(3, 4, rng); assertArrayEquals(rn.toArray(), drn.toArray(), 0.0); - Matrix s = Matrices.sprand(3, 4, 0.5, 24); - SparseMatrix sr = SparseMatrix.sprand(3, 4, 0.5, 24); + rng.setSeed(24); + Matrix s = Matrices.sprand(3, 4, 0.5, rng); + rng.setSeed(24); + SparseMatrix sr = SparseMatrix.sprand(3, 4, 0.5, rng); assertArrayEquals(s.toArray(), sr.toArray(), 0.0); - Matrix sn = Matrices.sprandn(3, 4, 0.5, 24); - SparseMatrix srn = SparseMatrix.sprandn(3, 4, 0.5, 24); + rng.setSeed(24); + Matrix sn = Matrices.sprandn(3, 4, 0.5, rng); + rng.setSeed(24); + SparseMatrix srn = SparseMatrix.sprandn(3, 4, 0.5, rng); assertArrayEquals(sn.toArray(), srn.toArray(), 0.0); } @@ -95,8 +104,10 @@ public void concatenateMatrices() { int m = 3; int n = 2; - SparseMatrix spMat1 = SparseMatrix.sprand(m, n, 0.5, 42); - DenseMatrix deMat1 = DenseMatrix.rand(m, n, 42); + Random rng = new Random(42); + SparseMatrix spMat1 = SparseMatrix.sprand(m, n, 0.5, rng); + rng.setSeed(42); + DenseMatrix deMat1 = DenseMatrix.rand(m, n, rng); Matrix deMat2 = Matrices.eye(3); Matrix spMat2 = Matrices.speye(3); Matrix deMat3 = Matrices.eye(2); From 065b53181349fa0cc56d4828044b1d564791ea80 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 16 Dec 2014 00:54:19 -0800 Subject: [PATCH 06/17] [SPARK-4409] First pass after code review --- .../apache/spark/mllib/linalg/Matrices.scala | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index e024eb05af6a6..4dfc52fcac7c4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -82,10 +82,14 @@ sealed trait Matrix extends Serializable { /** A human readable representation of the matrix */ override def toString: String = toBreeze.toString() - /** Map the values of this matrix using a function. Generates a new matrix. */ + /** Map the values of this matrix using a function. Generates a new matrix. Performs the + * function on only the backing array. For example, an operation such as addition or + * subtraction will only be performed on the non-zero values in a `SparseMatrix`. */ private[mllib] def map(f: Double => Double): Matrix - /** Update all the values of this matrix using the function f. Performed in-place. */ + /** Update all the values of this matrix using the function f. Performed in-place on the + * backing array. For example, an operation such as addition or subtraction will only be + * performed on the non-zero values in a `SparseMatrix`. */ private[mllib] def update(f: Double => Double): Matrix } @@ -175,7 +179,7 @@ object DenseMatrix { def eye(n: Int): DenseMatrix = { val identity = DenseMatrix.zeros(n, n) var i = 0 - while (i < n){ + while (i < n) { identity.update(i, i, 1.0) i += 1 } @@ -286,7 +290,7 @@ class SparseMatrix( private[mllib] def update(i: Int, j: Int, v: Double): Unit = { val ind = index(i, j) - if (ind == -1){ + if (ind == -1) { throw new NoSuchElementException("The given row and column indices correspond to a zero " + "value. Only non-zero elements in Sparse Matrices can be updated.") } else { @@ -341,10 +345,10 @@ object SparseMatrix { raw.foreach { v => val r = i % numRows val c = (i - r) / numRows - if ( v != 0.0) { + if (v != 0.0) { sRows.append(r) sparseA.append(v) - while (c != lastCol){ + while (c != lastCol) { sCols.append(nnz) lastCol += 1 } @@ -352,7 +356,7 @@ object SparseMatrix { } i += 1 } - while (numCols > lastCol){ + while (numCols > lastCol) { sCols.append(sparseA.length) lastCol += 1 } @@ -368,8 +372,8 @@ object SparseMatrix { * @return `SparseMatrix` with size `numRows` x `numCols` and values in U(0, 1) */ def sprand(numRows: Int, numCols: Int, density: Double, rng: Random): SparseMatrix = { - require(density > 0.0 && density < 1.0, "density must be a double in the range " + - s"0.0 < d < 1.0. Currently, density: $density") + require(density >= 0.0 && density <= 1.0, "density must be a double in the range " + + s"0.0 <= d <= 1.0. Currently, density: $density") val length = numRows * numCols val rawA = new Array[Double](length) var nnz = 0 @@ -392,8 +396,8 @@ object SparseMatrix { * @return `SparseMatrix` with size `numRows` x `numCols` and values in N(0, 1) */ def sprandn(numRows: Int, numCols: Int, density: Double, rng: Random): SparseMatrix = { - require(density > 0.0 && density < 1.0, "density must be a double in the range " + - s"0.0 < d < 1.0. Currently, density: $density") + require(density >= 0.0 && density <= 1.0, "density must be a double in the range " + + s"0.0 <= d <= 1.0. Currently, density: $density") val length = numRows * numCols val rawA = new Array[Double](length) var nnz = 0 @@ -408,7 +412,7 @@ object SparseMatrix { } /** - * Generate a diagonal matrix in `DenseMatrix` format from the supplied values. + * Generate a diagonal matrix in `SparseMatrix` format from the supplied values. * @param vector a `Vector` that will form the values on the diagonal of the matrix * @return Square `SparseMatrix` with size `values.length` x `values.length` and non-zero * `values` on the diagonal @@ -519,7 +523,7 @@ object Matrices { * Generate a `DenseMatrix` consisting of zeros. * @param numRows number of rows of the matrix * @param numCols number of columns of the matrix - * @return `DenseMatrix` with size `numRows` x `numCols` and values of zeros + * @return `Matrix` with size `numRows` x `numCols` and values of zeros */ def zeros(numRows: Int, numCols: Int): Matrix = DenseMatrix.zeros(numRows, numCols) @@ -527,30 +531,30 @@ object Matrices { * Generate a `DenseMatrix` consisting of ones. * @param numRows number of rows of the matrix * @param numCols number of columns of the matrix - * @return `DenseMatrix` with size `numRows` x `numCols` and values of ones + * @return `Matrix` with size `numRows` x `numCols` and values of ones */ def ones(numRows: Int, numCols: Int): Matrix = DenseMatrix.ones(numRows, numCols) /** * Generate a dense Identity Matrix in `Matrix` format. * @param n number of rows and columns of the matrix - * @return `DenseMatrix` with size `n` x `n` and values of ones on the diagonal + * @return `Matrix` with size `n` x `n` and values of ones on the diagonal */ def eye(n: Int): Matrix = DenseMatrix.eye(n) /** * Generate a sparse Identity Matrix in `Matrix` format. * @param n number of rows and columns of the matrix - * @return `SparseMatrix` with size `n` x `n` and values of ones on the diagonal + * @return `Matrix` with size `n` x `n` and values of ones on the diagonal */ def speye(n: Int): Matrix = SparseMatrix.speye(n) /** - * Generate a dense `Matrix` consisting of i.i.d. uniform random numbers. + * Generate a `DenseMatrix` consisting of i.i.d. uniform random numbers. * @param numRows number of rows of the matrix * @param numCols number of columns of the matrix * @param rng a random number generator - * @return `DenseMatrix` with size `numRows` x `numCols` and values in U(0, 1) + * @return `Matrix` with size `numRows` x `numCols` and values in U(0, 1) */ def rand(numRows: Int, numCols: Int, rng: Random): Matrix = DenseMatrix.rand(numRows, numCols, rng) @@ -571,7 +575,7 @@ object Matrices { * @param numRows number of rows of the matrix * @param numCols number of columns of the matrix * @param rng a random number generator - * @return `DenseMatrix` with size `numRows` x `numCols` and values in N(0, 1) + * @return `Matrix` with size `numRows` x `numCols` and values in N(0, 1) */ def randn(numRows: Int, numCols: Int, rng: Random): Matrix = DenseMatrix.randn(numRows, numCols, rng) @@ -590,7 +594,7 @@ object Matrices { /** * Generate a diagonal matrix in `DenseMatrix` format from the supplied values. * @param vector a `Vector` tat will form the values on the diagonal of the matrix - * @return Square `DenseMatrix` with size `values.length` x `values.length` and `values` + * @return Square `Matrix` with size `values.length` x `values.length` and `values` * on the diagonal */ def diag(vector: Vector): Matrix = DenseMatrix.diag(vector) @@ -653,8 +657,8 @@ object Matrices { * Vertically concatenate a sequence of matrices. The returned matrix will be in the format * the matrices are supplied in. Supplying a mix of dense and sparse matrices will result in * a dense matrix. - * @param matrices sequence of matrices - * @return a single `Matrix` composed of the matrices that were horizontally concatenated + * @param matrices array of matrices + * @return a single `Matrix` composed of the matrices that were vertically concatenated */ def vertcat(matrices: Array[Matrix]): Matrix = { if (matrices.size == 1) { From d8be7bc07b23982c4fced647f85982c6b7cadd4b Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 16 Dec 2014 01:00:16 -0800 Subject: [PATCH 07/17] [SPARK-4409] Organized imports --- .../main/scala/org/apache/spark/mllib/linalg/Matrices.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 4dfc52fcac7c4..b02e756b05db6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -17,11 +17,12 @@ package org.apache.spark.mllib.linalg -import breeze.linalg.{Matrix => BM, DenseMatrix => BDM, CSCMatrix => BSM} +import java.util.{Arrays, Random} -import java.util.{Random, Arrays} import scala.collection.mutable.ArrayBuffer +import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM} + /** * Trait for a local matrix. */ From 65c562e57078ccb31de281b238a9348dd9a1f7c2 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 16 Dec 2014 02:37:54 -0800 Subject: [PATCH 08/17] [SPARK-4409] Hopefully fixed Java Test --- .../java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java index a75f9a6141e56..df811933a080b 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java @@ -82,8 +82,8 @@ public void diagonalMatrixConstruction() { assertArrayEquals(s.values(), ss.values(), 0.0); assert(s.values().length == 2); assert(ss.values().length == 2); - assert(s.colPtrs().length == 2); - assert(ss.colPtrs().length == 2); + assert(s.colPtrs().length == 4); + assert(ss.colPtrs().length == 4); } @Test From e4bd0c02df49b07ed0ee3687c3ac8e44868c857a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 17 Dec 2014 01:51:48 -0800 Subject: [PATCH 09/17] [SPARK-4409] Modified horzcat and vertcat --- .../apache/spark/mllib/linalg/Matrices.scala | 172 +++++++++++++----- .../spark/mllib/linalg/MatricesSuite.scala | 34 ++-- .../spark/mllib/util/TestingUtils.scala | 6 +- 3 files changed, 145 insertions(+), 67 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index b02e756b05db6..e18046e37225d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -603,7 +603,7 @@ object Matrices { /** * Horizontally concatenate a sequence of matrices. The returned matrix will be in the format * the matrices are supplied in. Supplying a mix of dense and sparse matrices will result in - * a dense matrix. + * a sparse matrix. * @param matrices array of matrices * @return a single `Matrix` composed of the matrices that were horizontally concatenated */ @@ -621,17 +621,42 @@ object Matrices { mat match { case sparse: SparseMatrix => isSparse = true case dense: DenseMatrix => isDense = true + case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " + + s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}") } numCols += mat.numCols } require(rowsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (isSparse && !isDense) { + if (!isSparse && isDense) { + new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray).toArray) + } else { val allColPtrs: Array[(Int, Int)] = Array((0, 0)) ++ matrices.zipWithIndex.flatMap { case (mat, ind) => - val ptr = mat.asInstanceOf[SparseMatrix].colPtrs - ptr.slice(1, ptr.length).map(p => (ind, p)) - } + mat match { + case spMat: SparseMatrix => + val ptr = spMat.colPtrs + ptr.slice(1, ptr.length).map(p => (ind, p)) + case dnMat: DenseMatrix => + val colSize = dnMat.numCols + var j = 0 + val rowSize = dnMat.numRows + val ptr = new ArrayBuffer[(Int, Int)](colSize) + var nnz = 0 + val vals = dnMat.values + while (j < colSize) { + var i = j * rowSize + val indEnd = (j + 1) * rowSize + while (i < indEnd) { + if (vals(i) != 0.0) nnz += 1 + i += 1 + } + j += 1 + ptr.append((ind, nnz)) + } + ptr + } + } var counter = 0 var lastIndex = 0 var lastPtr = 0 @@ -643,21 +668,36 @@ object Matrices { lastPtr = p counter + p } + val valsAndIndices: Array[(Int, Double)] = matrices.flatMap { + case spMat: SparseMatrix => + spMat.rowIndices.zip(spMat.values) + case dnMat: DenseMatrix => + val colSize = dnMat.numCols + var j = 0 + val rowSize = dnMat.numRows + val data = new ArrayBuffer[(Int, Double)]() + val vals = dnMat.values + while (j < colSize) { + val indStart = j * rowSize + var i = 0 + while (i < rowSize) { + val index = indStart + i + if (vals(index) != 0.0) data.append((i, vals(index))) + i += 1 + } + j += 1 + } + data + } new SparseMatrix(numRows, numCols, adjustedPtrs, - matrices.flatMap(_.asInstanceOf[SparseMatrix].rowIndices).toArray, - matrices.flatMap(_.asInstanceOf[SparseMatrix].values).toArray) - } else if (!isSparse && !isDense) { - throw new IllegalArgumentException("The supplied matrices are neither in SparseMatrix or" + - " DenseMatrix format!") - }else { - new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray).toArray) + valsAndIndices.map(_._1), valsAndIndices.map(_._2)) } } /** * Vertically concatenate a sequence of matrices. The returned matrix will be in the format * the matrices are supplied in. Supplying a mix of dense and sparse matrices will result in - * a dense matrix. + * a sparse matrix. * @param matrices array of matrices * @return a single `Matrix` composed of the matrices that were vertically concatenated */ @@ -680,27 +720,58 @@ object Matrices { case dense: DenseMatrix => isDense = true valsLength += dense.values.length + case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " + + s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}") } numRows += mat.numRows } require(colsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (isSparse && !isDense) { - val matMap = matrices.zipWithIndex.map(d => (d._2, d._1.asInstanceOf[SparseMatrix])).toMap - // (matrixInd, colInd, colStart, colEnd, numRows) - val allColPtrs: Seq[(Int, Int, Int, Int, Int)] = - matMap.flatMap { case (ind, mat) => - val ptr = mat.colPtrs - var colStart = 0 - var j = 0 - ptr.slice(1, ptr.length).map { p => - j += 1 - val oldColStart = colStart - colStart = p - (j - 1, ind, oldColStart, p, mat.numRows) - } - }.toSeq + if (!isSparse && isDense) { + val matData = matrices.zipWithIndex.flatMap { case (mat, ind) => + val values = mat.toArray + for (j <- 0 until numCols) yield (j, ind, + values.slice(j * mat.numRows, (j + 1) * mat.numRows)) + }.sortBy(x => (x._1, x._2)) + new DenseMatrix(numRows, numCols, matData.flatMap(_._3).toArray) + } else { + val matMap = matrices.zipWithIndex.map(d => (d._2, d._1)).toMap + // (colInd, matrixInd, colStart, colEnd, numRows) + val allColPtrs: Seq[(Int, Int, Int, Int, Int)] = matMap.flatMap { case (ind, mat) => + mat match { + case spMat: SparseMatrix => + val ptr = spMat.colPtrs + var colStart = 0 + var j = 0 + ptr.slice(1, ptr.length).map { p => + j += 1 + val oldColStart = colStart + colStart = p + (j - 1, ind, oldColStart, p, spMat.numRows) + } + case dnMat: DenseMatrix => + val colSize = dnMat.numCols + var j = 0 + val rowSize = dnMat.numRows + val ptr = new ArrayBuffer[(Int, Int, Int, Int, Int)](colSize) + var nnz = 0 + val vals = dnMat.values + var colStart = 0 + while (j < colSize) { + var i = j * rowSize + val indEnd = (j + 1) * rowSize + while (i < indEnd) { + if (vals(i) != 0.0) nnz += 1 + i += 1 + } + ptr.append((j, ind, colStart, nnz, dnMat.numRows)) + j += 1 + colStart = nnz + } + ptr + } + }.toSeq val values = new ArrayBuffer[Double](valsLength) val rowInd = new ArrayBuffer[Int](valsLength) val newColPtrs = new Array[Int](numCols) @@ -712,31 +783,38 @@ object Matrices { var startRow = 0 sortedPtrs.foreach { case (colIdx, matrixInd, colStart, colEnd, nRows) => val selectedMatrix = matMap(matrixInd) - val selectedValues = selectedMatrix.values.slice(colStart, colEnd) - val selectedRowIdx = selectedMatrix.rowIndices.slice(colStart, colEnd) - val len = selectedValues.length - newColPtrs(colIdx) += len - var i = 0 - while (i < len) { - values.append(selectedValues(i)) - rowInd.append(selectedRowIdx(i) + startRow) - i += 1 + selectedMatrix match { + case spMat: SparseMatrix => + val selectedValues = spMat.values + val selectedRowIdx = spMat.rowIndices + val len = colEnd - colStart + newColPtrs(colIdx) += len + var i = colStart + while (i < colEnd) { + values.append(selectedValues(i)) + rowInd.append(selectedRowIdx(i) + startRow) + i += 1 + } + case dnMat: DenseMatrix => + val selectedValues = dnMat.values + val len = colEnd - colStart + newColPtrs(colIdx) += len + val indStart = colIdx * nRows + var i = 0 + while (i < nRows) { + val v = selectedValues(indStart + i) + if (v != 0) { + values.append(v) + rowInd.append(i + startRow) + } + i += 1 + } } startRow += nRows } } val adjustedPtrs = newColPtrs.scanLeft(0)(_ + _) new SparseMatrix(numRows, numCols, adjustedPtrs, rowInd.toArray, values.toArray) - } else if (!isSparse && !isDense) { - throw new IllegalArgumentException("The supplied matrices are neither in SparseMatrix or" + - " DenseMatrix format!") - }else { - val matData = matrices.zipWithIndex.flatMap { case (mat, ind) => - val values = mat.toArray - for (j <- 0 until numCols) yield (j, ind, - values.slice(j * mat.numRows, (j + 1) * mat.numRows)) - }.sortBy(x => (x._1, x._2)) - new DenseMatrix(numRows, numCols, matData.flatMap(_._3).toArray) } } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index 6526bd614fe40..c9a2b1f0454b5 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -152,21 +152,21 @@ class MatricesSuite extends FunSuite { val spMat3 = Matrices.speye(2) val spHorz = Matrices.horzcat(Array(spMat1, spMat2)) + val spHorz2 = Matrices.horzcat(Array(spMat1, deMat2)) + val spHorz3 = Matrices.horzcat(Array(deMat1, spMat2)) val deHorz1 = Matrices.horzcat(Array(deMat1, deMat2)) - val deHorz2 = Matrices.horzcat(Array(spMat1, deMat2)) - val deHorz3 = Matrices.horzcat(Array(deMat1, spMat2)) assert(deHorz1.numRows === 3) - assert(deHorz2.numRows === 3) - assert(deHorz3.numRows === 3) + assert(spHorz2.numRows === 3) + assert(spHorz3.numRows === 3) assert(spHorz.numRows === 3) assert(deHorz1.numCols === 5) - assert(deHorz2.numCols === 5) - assert(deHorz3.numCols === 5) + assert(spHorz2.numCols === 5) + assert(spHorz3.numCols === 5) assert(spHorz.numCols === 5) - assert(deHorz1 === deHorz2) - assert(deHorz2 === deHorz3) + assert(deHorz1.toBreeze.toDenseMatrix === spHorz2.toBreeze.toDenseMatrix) + assert(spHorz2.toBreeze === spHorz3.toBreeze) assert(spHorz(0, 0) === 1.0) assert(spHorz(2, 1) === 5.0) assert(spHorz(0, 2) === 1.0) @@ -177,7 +177,7 @@ class MatricesSuite extends FunSuite { assert(deHorz1(0, 0) === 1.0) assert(deHorz1(2, 1) === 5.0) assert(deHorz1(0, 2) === 1.0) - assert(deHorz1(1, 2) === 0.0) + assert(deHorz1(1, 2) == 0.0) assert(deHorz1(1, 3) === 1.0) assert(deHorz1(2, 4) === 1.0) assert(deHorz1(1, 4) === 0.0) @@ -192,20 +192,20 @@ class MatricesSuite extends FunSuite { val spVert = Matrices.vertcat(Array(spMat1, spMat3)) val deVert1 = Matrices.vertcat(Array(deMat1, deMat3)) - val deVert2 = Matrices.vertcat(Array(spMat1, deMat3)) - val deVert3 = Matrices.vertcat(Array(deMat1, spMat3)) + val spVert2 = Matrices.vertcat(Array(spMat1, deMat3)) + val spVert3 = Matrices.vertcat(Array(deMat1, spMat3)) assert(deVert1.numRows === 5) - assert(deVert2.numRows === 5) - assert(deVert3.numRows === 5) + assert(spVert2.numRows === 5) + assert(spVert3.numRows === 5) assert(spVert.numRows === 5) assert(deVert1.numCols === 2) - assert(deVert2.numCols === 2) - assert(deVert3.numCols === 2) + assert(spVert2.numCols === 2) + assert(spVert3.numCols === 2) assert(spVert.numCols === 2) - assert(deVert1 === deVert2) - assert(deVert2 === deVert3) + assert(deVert1.toBreeze.toDenseMatrix === spVert2.toBreeze.toDenseMatrix) + assert(spVert2.toBreeze === spVert3.toBreeze) assert(spVert(0, 0) === 1.0) assert(spVert(2, 1) === 5.0) assert(spVert(3, 0) === 1.0) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/TestingUtils.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/TestingUtils.scala index 30b906aaa3ba4..e957fa5d25f4c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/TestingUtils.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/TestingUtils.scala @@ -178,17 +178,17 @@ object TestingUtils { implicit class MatrixWithAlmostEquals(val x: Matrix) { /** - * When the difference of two vectors are within eps, returns true; otherwise, returns false. + * When the difference of two matrices are within eps, returns true; otherwise, returns false. */ def ~=(r: CompareMatrixRightSide): Boolean = r.fun(x, r.y, r.eps) /** - * When the difference of two vectors are within eps, returns false; otherwise, returns true. + * When the difference of two matrices are within eps, returns false; otherwise, returns true. */ def !~=(r: CompareMatrixRightSide): Boolean = !r.fun(x, r.y, r.eps) /** - * Throws exception when the difference of two vectors are NOT within eps; + * Throws exception when the difference of two matrices are NOT within eps; * otherwise, returns true. */ def ~==(r: CompareMatrixRightSide): Boolean = { From 75239f8e5b41a275a0f232108b26cb0e16935bbf Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 18 Dec 2014 07:10:27 -0800 Subject: [PATCH 10/17] [SPARK-4409] Second pass of code review --- .../apache/spark/mllib/linalg/Matrices.scala | 408 ++++++++---------- .../spark/mllib/linalg/MatricesSuite.scala | 32 +- 2 files changed, 199 insertions(+), 241 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index e18046e37225d..fc2f2fd9ca92b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg import java.util.{Arrays, Random} -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, Map} import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM} @@ -147,6 +147,35 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) } this } + + /** Generate a `SparseMatrix` from the given `DenseMatrix`. */ + def toSparse(): SparseMatrix = { + val sparseA: ArrayBuffer[Double] = new ArrayBuffer() + val sCols: ArrayBuffer[Int] = new ArrayBuffer(numCols + 1) + val sRows: ArrayBuffer[Int] = new ArrayBuffer() + var i = 0 + var nnz = 0 + var lastCol = -1 + values.foreach { v => + val r = i % numRows + val c = (i - r) / numRows + if (v != 0.0) { + sRows.append(r) + sparseA.append(v) + while (c != lastCol) { + sCols.append(nnz) + lastCol += 1 + } + nnz += 1 + } + i += 1 + } + while (numCols > lastCol) { + sCols.append(sparseA.length) + lastCol += 1 + } + new SparseMatrix(numRows, numCols, sCols.toArray, sRows.toArray, sparseA.toArray) + } } /** @@ -217,7 +246,7 @@ object DenseMatrix { */ def diag(vector: Vector): DenseMatrix = { val n = vector.size - val matrix = DenseMatrix.eye(n) + val matrix = DenseMatrix.zeros(n, n) val values = vector.toArray var i = 0 while (i < n) { @@ -259,6 +288,8 @@ class SparseMatrix( require(colPtrs.length == numCols + 1, "The length of the column indices should be the " + s"number of columns + 1. Currently, colPointers.length: ${colPtrs.length}, " + s"numCols: $numCols") + require(values.length == colPtrs.last, "The last value of colPtrs must equal the number of " + + s"elements. values.length: ${values.length}, colPtrs.last: ${colPtrs.last}") override def toArray: Array[Double] = { val arr = new Array[Double](numRows * numCols) @@ -313,6 +344,11 @@ class SparseMatrix( } this } + + /** Generate a `DenseMatrix` from the given `SparseMatrix`. */ + def toDense(): DenseMatrix = { + new DenseMatrix(numRows, numCols, toArray) + } } /** @@ -320,6 +356,39 @@ class SparseMatrix( */ object SparseMatrix { + /** + * Generate a `SparseMatrix` from Coordinate List (COO) format. Input must be an array of + * (row, column, value) tuples. Array must be sorted first by *column* index and then by row + * index. + * @param numRows number of rows of the matrix + * @param numCols number of columns of the matrix + * @param entries Array of ((row, column), value) tuples + * @return The corresponding `SparseMatrix` + */ + def fromCOO(numRows: Int, numCols: Int, entries: Array[((Int, Int), Double)]): SparseMatrix = { + val colPtrs = new ArrayBuffer[Int](numCols + 1) + colPtrs.append(0) + var nnz = 0 + var lastCol = 0 + val values = entries.map { case ((i, j), v) => + while (j != lastCol) { + colPtrs.append(nnz) + lastCol += 1 + if (lastCol > numCols) { + throw new IndexOutOfBoundsException("Please make sure that the entries array is " + + "sorted by COLUMN index first and then by row index.") + } + } + nnz += 1 + v + } + while (numCols > lastCol) { + colPtrs.append(nnz) + lastCol += 1 + } + new SparseMatrix(numRows, numCols, colPtrs.toArray, entries.map(_._1._1), values) + } + /** * Generate an Identity Matrix in `SparseMatrix` format. * @param n number of rows and columns of the matrix @@ -329,43 +398,36 @@ object SparseMatrix { new SparseMatrix(n, n, (0 to n).toArray, (0 until n).toArray, Array.fill(n)(1.0)) } - /** Generates a SparseMatrix given an Array[Double] of size numRows * numCols. The number of - * non-zeros in `raw` is provided for efficiency. */ - private def genRand( + /** Generates a `SparseMatrix` with a given random number generator and `method`, which + * specifies the distribution. */ + private def genRandMatrix( numRows: Int, numCols: Int, - raw: Array[Double], - nonZero: Int): SparseMatrix = { - val sparseA: ArrayBuffer[Double] = new ArrayBuffer(nonZero) - val sCols: ArrayBuffer[Int] = new ArrayBuffer(numCols + 1) - val sRows: ArrayBuffer[Int] = new ArrayBuffer(nonZero) - + density: Double, + rng: Random, + method: Random => Double): SparseMatrix = { + require(density >= 0.0 && density <= 1.0, "density must be a double in the range " + + s"0.0 <= d <= 1.0. Currently, density: $density") + val length = math.ceil(numRows * numCols * density).toInt + val entries = Map[(Int, Int), Double]() var i = 0 - var nnz = 0 - var lastCol = -1 - raw.foreach { v => - val r = i % numRows - val c = (i - r) / numRows - if (v != 0.0) { - sRows.append(r) - sparseA.append(v) - while (c != lastCol) { - sCols.append(nnz) - lastCol += 1 - } - nnz += 1 + while (i < length) { + var rowIndex = rng.nextInt(numRows) + var colIndex = rng.nextInt(numCols) + while (entries.contains((rowIndex, colIndex))) { + rowIndex = rng.nextInt(numRows) + colIndex = rng.nextInt(numCols) } + entries += (rowIndex, colIndex) -> method(rng) i += 1 } - while (numCols > lastCol) { - sCols.append(sparseA.length) - lastCol += 1 - } - new SparseMatrix(numRows, numCols, sCols.toArray, sRows.toArray, sparseA.toArray) + SparseMatrix.fromCOO(numRows, numCols, entries.toArray.sortBy(v => (v._1._2, v._1._1))) } /** - * Generate a `SparseMatrix` consisting of i.i.d. uniform random numbers. + * Generate a `SparseMatrix` consisting of i.i.d. uniform random numbers. The number of non-zero + * elements equal the ceiling of `numRows` x `numCols` x `density` + * * @param numRows number of rows of the matrix * @param numCols number of columns of the matrix * @param density the desired density for the matrix @@ -373,19 +435,8 @@ object SparseMatrix { * @return `SparseMatrix` with size `numRows` x `numCols` and values in U(0, 1) */ def sprand(numRows: Int, numCols: Int, density: Double, rng: Random): SparseMatrix = { - require(density >= 0.0 && density <= 1.0, "density must be a double in the range " + - s"0.0 <= d <= 1.0. Currently, density: $density") - val length = numRows * numCols - val rawA = new Array[Double](length) - var nnz = 0 - for (i <- 0 until length) { - val p = rng.nextDouble() - if (p <= density) { - rawA.update(i, rng.nextDouble()) - nnz += 1 - } - } - genRand(numRows, numCols, rawA, nnz) + def method(rand: Random): Double = rand.nextDouble() + genRandMatrix(numRows, numCols, density, rng, method) } /** @@ -397,19 +448,8 @@ object SparseMatrix { * @return `SparseMatrix` with size `numRows` x `numCols` and values in N(0, 1) */ def sprandn(numRows: Int, numCols: Int, density: Double, rng: Random): SparseMatrix = { - require(density >= 0.0 && density <= 1.0, "density must be a double in the range " + - s"0.0 <= d <= 1.0. Currently, density: $density") - val length = numRows * numCols - val rawA = new Array[Double](length) - var nnz = 0 - for (i <- 0 until length) { - val p = rng.nextDouble() - if (p <= density) { - rawA.update(i, rng.nextGaussian()) - nnz += 1 - } - } - genRand(numRows, numCols, rawA, nnz) + def method(rand: Random): Double = rand.nextGaussian() + genRandMatrix(numRows, numCols, density, rng, method) } /** @@ -422,47 +462,12 @@ object SparseMatrix { val n = vector.size vector match { case sVec: SparseVector => - val rows = sVec.indices - val values = sVec.values - var i = 0 - var lastCol = -1 - val colPtrs = new ArrayBuffer[Int](n + 1) - rows.foreach { r => - while (r != lastCol) { - colPtrs.append(i) - lastCol += 1 - } - i += 1 - } - while (n > lastCol) { - colPtrs.append(i) - lastCol += 1 - } - new SparseMatrix(n, n, colPtrs.toArray, rows, values) + val indices = sVec.indices.map(i => (i, i)) + SparseMatrix.fromCOO(n, n, indices.zip(sVec.values)) case dVec: DenseVector => - val values = dVec.values - var i = 0 - var nnz = 0 - val sVals = values.filter(v => v != 0.0) - var lastCol = -1 - val colPtrs = new ArrayBuffer[Int](n + 1) - val sRows = new ArrayBuffer[Int](sVals.length) - values.foreach { v => - if (v != 0.0) { - sRows.append(i) - while (lastCol != i) { - colPtrs.append(nnz) - lastCol += 1 - } - nnz += 1 - } - i += 1 - } - while (lastCol != i) { - colPtrs.append(nnz) - lastCol += 1 - } - new SparseMatrix(n, n, colPtrs.toArray, sRows.toArray, sVals) + val values = dVec.values.zipWithIndex + val nnzVals = values.filter(v => v._1 != 0.0) + SparseMatrix.fromCOO(n, n, nnzVals.map(v => ((v._2, v._2), v._1))) } } } @@ -603,24 +608,26 @@ object Matrices { /** * Horizontally concatenate a sequence of matrices. The returned matrix will be in the format * the matrices are supplied in. Supplying a mix of dense and sparse matrices will result in - * a sparse matrix. + * a sparse matrix. If the Array is empty, an empty `DenseMatrix` will be returned. * @param matrices array of matrices * @return a single `Matrix` composed of the matrices that were horizontally concatenated */ def horzcat(matrices: Array[Matrix]): Matrix = { if (matrices.size == 1) { return matrices(0) + } else if (matrices.size == 0) { + return new DenseMatrix(0, 0, Array[Double]()) } val numRows = matrices(0).numRows var rowsMatch = true - var isDense = false - var isSparse = false + var hasDense = false + var hasSparse = false var numCols = 0 matrices.foreach { mat => if (numRows != mat.numRows) rowsMatch = false mat match { - case sparse: SparseMatrix => isSparse = true - case dense: DenseMatrix => isDense = true + case sparse: SparseMatrix => hasSparse = true + case dense: DenseMatrix => hasDense = true case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " + s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}") } @@ -628,97 +635,69 @@ object Matrices { } require(rowsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (!isSparse && isDense) { - new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray).toArray) + if (!hasSparse && hasDense) { + new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray)) } else { - val allColPtrs: Array[(Int, Int)] = Array((0, 0)) ++ - matrices.zipWithIndex.flatMap { case (mat, ind) => - mat match { - case spMat: SparseMatrix => - val ptr = spMat.colPtrs - ptr.slice(1, ptr.length).map(p => (ind, p)) - case dnMat: DenseMatrix => - val colSize = dnMat.numCols - var j = 0 - val rowSize = dnMat.numRows - val ptr = new ArrayBuffer[(Int, Int)](colSize) - var nnz = 0 - val vals = dnMat.values - while (j < colSize) { - var i = j * rowSize - val indEnd = (j + 1) * rowSize - while (i < indEnd) { - if (vals(i) != 0.0) nnz += 1 - i += 1 - } + var startCol = 0 + val entries: Array[((Int, Int), Double)] = matrices.flatMap { + case spMat: SparseMatrix => + var j = 0 + var cnt = 0 + val ptr = spMat.colPtrs + val data = spMat.rowIndices.zip(spMat.values).map { case (i, v) => + cnt += 1 + if (cnt <= ptr(j + 1)) { + ((i, j + startCol), v) + } else { + while (ptr(j + 1) < cnt) { j += 1 - ptr.append((ind, nnz)) } - ptr + ((i, j + startCol), v) + } } - } - var counter = 0 - var lastIndex = 0 - var lastPtr = 0 - val adjustedPtrs = allColPtrs.map { case (ind, p) => - if (ind != lastIndex) { - counter += lastPtr - lastIndex = ind - } - lastPtr = p - counter + p - } - val valsAndIndices: Array[(Int, Double)] = matrices.flatMap { - case spMat: SparseMatrix => - spMat.rowIndices.zip(spMat.values) + startCol += spMat.numCols + data case dnMat: DenseMatrix => - val colSize = dnMat.numCols - var j = 0 - val rowSize = dnMat.numRows - val data = new ArrayBuffer[(Int, Double)]() - val vals = dnMat.values - while (j < colSize) { - val indStart = j * rowSize - var i = 0 - while (i < rowSize) { - val index = indStart + i - if (vals(index) != 0.0) data.append((i, vals(index))) - i += 1 - } - j += 1 + val nnzValues = dnMat.values.zipWithIndex.filter(v => v._1 != 0.0) + val data = nnzValues.map { case (v, i) => + val rowIndex = i % dnMat.numRows + val colIndex = i / dnMat.numRows + ((rowIndex, colIndex + startCol), v) } + startCol += dnMat.numCols data } - new SparseMatrix(numRows, numCols, adjustedPtrs, - valsAndIndices.map(_._1), valsAndIndices.map(_._2)) + SparseMatrix.fromCOO(numRows, numCols, entries) } } /** * Vertically concatenate a sequence of matrices. The returned matrix will be in the format * the matrices are supplied in. Supplying a mix of dense and sparse matrices will result in - * a sparse matrix. + * a sparse matrix. If the Array is empty, an empty `DenseMatrix` will be returned. * @param matrices array of matrices * @return a single `Matrix` composed of the matrices that were vertically concatenated */ def vertcat(matrices: Array[Matrix]): Matrix = { if (matrices.size == 1) { return matrices(0) + } else if (matrices.size == 0) { + return new DenseMatrix(0, 0, Array[Double]()) } val numCols = matrices(0).numCols var colsMatch = true - var isDense = false - var isSparse = false + var hasDense = false + var hasSparse = false var numRows = 0 var valsLength = 0 matrices.foreach { mat => if (numCols != mat.numCols) colsMatch = false mat match { case sparse: SparseMatrix => - isSparse = true + hasSparse = true valsLength += sparse.values.length case dense: DenseMatrix => - isDense = true + hasDense = true valsLength += dense.values.length case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " + s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}") @@ -728,93 +707,44 @@ object Matrices { } require(colsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (!isSparse && isDense) { + if (!hasSparse && hasDense) { val matData = matrices.zipWithIndex.flatMap { case (mat, ind) => val values = mat.toArray for (j <- 0 until numCols) yield (j, ind, values.slice(j * mat.numRows, (j + 1) * mat.numRows)) }.sortBy(x => (x._1, x._2)) - new DenseMatrix(numRows, numCols, matData.flatMap(_._3).toArray) + new DenseMatrix(numRows, numCols, matData.flatMap(_._3)) } else { - val matMap = matrices.zipWithIndex.map(d => (d._2, d._1)).toMap - // (colInd, matrixInd, colStart, colEnd, numRows) - val allColPtrs: Seq[(Int, Int, Int, Int, Int)] = matMap.flatMap { case (ind, mat) => - mat match { - case spMat: SparseMatrix => - val ptr = spMat.colPtrs - var colStart = 0 - var j = 0 - ptr.slice(1, ptr.length).map { p => - j += 1 - val oldColStart = colStart - colStart = p - (j - 1, ind, oldColStart, p, spMat.numRows) - } - case dnMat: DenseMatrix => - val colSize = dnMat.numCols - var j = 0 - val rowSize = dnMat.numRows - val ptr = new ArrayBuffer[(Int, Int, Int, Int, Int)](colSize) - var nnz = 0 - val vals = dnMat.values - var colStart = 0 - while (j < colSize) { - var i = j * rowSize - val indEnd = (j + 1) * rowSize - while (i < indEnd) { - if (vals(i) != 0.0) nnz += 1 - i += 1 + var startRow = 0 + val entries: Array[((Int, Int), Double)] = matrices.flatMap { + case spMat: SparseMatrix => + var j = 0 + var cnt = 0 + val ptr = spMat.colPtrs + val data = spMat.rowIndices.zip(spMat.values).map { case (i, v) => + cnt += 1 + if (cnt <= ptr(j + 1)) { + ((i + startRow, j), v) + } else { + while (ptr(j + 1) < cnt) { + j += 1 } - ptr.append((j, ind, colStart, nnz, dnMat.numRows)) - j += 1 - colStart = nnz + ((i + startRow, j), v) } - ptr - } - }.toSeq - val values = new ArrayBuffer[Double](valsLength) - val rowInd = new ArrayBuffer[Int](valsLength) - val newColPtrs = new Array[Int](numCols) - - // group metadata by column index and then sort in increasing order of column index - allColPtrs.groupBy(_._1).toArray.sortBy(_._1).foreach { case (colInd, data) => - // then sort by matrix index - val sortedPtrs = data.sortBy(_._1) - var startRow = 0 - sortedPtrs.foreach { case (colIdx, matrixInd, colStart, colEnd, nRows) => - val selectedMatrix = matMap(matrixInd) - selectedMatrix match { - case spMat: SparseMatrix => - val selectedValues = spMat.values - val selectedRowIdx = spMat.rowIndices - val len = colEnd - colStart - newColPtrs(colIdx) += len - var i = colStart - while (i < colEnd) { - values.append(selectedValues(i)) - rowInd.append(selectedRowIdx(i) + startRow) - i += 1 - } - case dnMat: DenseMatrix => - val selectedValues = dnMat.values - val len = colEnd - colStart - newColPtrs(colIdx) += len - val indStart = colIdx * nRows - var i = 0 - while (i < nRows) { - val v = selectedValues(indStart + i) - if (v != 0) { - values.append(v) - rowInd.append(i + startRow) - } - i += 1 - } } - startRow += nRows - } - } - val adjustedPtrs = newColPtrs.scanLeft(0)(_ + _) - new SparseMatrix(numRows, numCols, adjustedPtrs, rowInd.toArray, values.toArray) + startRow += spMat.numRows + data + case dnMat: DenseMatrix => + val nnzValues = dnMat.values.zipWithIndex.filter(v => v._1 != 0.0) + val data = nnzValues.map { case (v, i) => + val rowIndex = i % dnMat.numRows + val colIndex = i / dnMat.numRows + ((rowIndex + startRow, colIndex), v) + } + startRow += dnMat.numRows + data + }.sortBy(d => (d._1._2, d._1._1)) + SparseMatrix.fromCOO(numRows, numCols, entries) } } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index c9a2b1f0454b5..f5933ddcfca2e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -43,9 +43,9 @@ class MatricesSuite extends FunSuite { test("sparse matrix construction") { val m = 3 - val n = 2 + val n = 4 val values = Array(1.0, 2.0, 4.0, 5.0) - val colPtrs = Array(0, 2, 4) + val colPtrs = Array(0, 2, 2, 4, 4) val rowIndices = Array(1, 2, 1, 2) val mat = Matrices.sparse(m, n, colPtrs, rowIndices, values).asInstanceOf[SparseMatrix] assert(mat.numRows === m) @@ -53,6 +53,12 @@ class MatricesSuite extends FunSuite { assert(mat.values.eq(values), "should not copy data") assert(mat.colPtrs.eq(colPtrs), "should not copy data") assert(mat.rowIndices.eq(rowIndices), "should not copy data") + + val entries: Array[((Int, Int), Double)] = Array(((1, 0), 1.0), ((2, 0), 2.0), + ((1, 2), 4.0), ((2, 2), 5.0)) + + val mat2 = SparseMatrix.fromCOO(m, n, entries) + assert(mat.toBreeze === mat2.toBreeze) } test("sparse matrix construction with wrong number of elements") { @@ -271,4 +277,26 @@ class MatricesSuite extends FunSuite { assert(mat.numCols === 2) assert(mat.values.toSeq === Seq(1.0, 0.0, 0.0, 2.0)) } + + test("sprand") { + val rng = mock[Random] + when(rng.nextInt(4)).thenReturn(0, 1, 1, 3, 2, 2, 0, 1, 3, 0) + when(rng.nextDouble()).thenReturn(1.0, 2.0, 3.0, 4.0) + val mat = SparseMatrix.sprand(4, 4, 0.25, rng) + assert(mat.numRows === 4) + assert(mat.numCols === 4) + assert(mat.rowIndices.toSeq === Seq(3, 0, 2, 1)) + assert(mat.values.toSeq === Seq(4.0, 1.0, 3.0, 2.0)) + } + + test("sprandn") { + val rng = mock[Random] + when(rng.nextInt(4)).thenReturn(0, 1, 1, 3, 2, 2, 0, 1, 3, 0) + when(rng.nextGaussian()).thenReturn(1.0, 2.0, 3.0, 4.0) + val mat = SparseMatrix.sprandn(4, 4, 0.25, rng) + assert(mat.numRows === 4) + assert(mat.numCols === 4) + assert(mat.rowIndices.toSeq === Seq(3, 0, 2, 1)) + assert(mat.values.toSeq === Seq(4.0, 1.0, 3.0, 2.0)) + } } From 3971c931d18dfaea0ea66e0cbb19b61dbf310a66 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 19 Dec 2014 01:44:31 -0800 Subject: [PATCH 11/17] [SPARK-4409] Third pass of code review --- .../apache/spark/mllib/linalg/Matrices.scala | 236 +++++++++++------- .../spark/mllib/linalg/JavaMatricesSuite.java | 19 ++ .../spark/mllib/linalg/MatricesSuite.scala | 31 ++- 3 files changed, 189 insertions(+), 97 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index fc2f2fd9ca92b..9d2611e0ed8e9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg import java.util.{Arrays, Random} -import scala.collection.mutable.{ArrayBuffer, Map} +import scala.collection.mutable.{ArrayBuffer, ArrayBuilder, Map} import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM} @@ -150,31 +150,35 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) /** Generate a `SparseMatrix` from the given `DenseMatrix`. */ def toSparse(): SparseMatrix = { - val sparseA: ArrayBuffer[Double] = new ArrayBuffer() - val sCols: ArrayBuffer[Int] = new ArrayBuffer(numCols + 1) - val sRows: ArrayBuffer[Int] = new ArrayBuffer() - var i = 0 + val spVals: ArrayBuilder[Double] = new ArrayBuilder.ofDouble + val colPtrs: Array[Int] = new Array[Int](numCols + 1) + val rowIndices: ArrayBuilder[Int] = new ArrayBuilder.ofInt var nnz = 0 var lastCol = -1 - values.foreach { v => - val r = i % numRows - val c = (i - r) / numRows - if (v != 0.0) { - sRows.append(r) - sparseA.append(v) - while (c != lastCol) { - sCols.append(nnz) - lastCol += 1 + var j = 0 + while (j < numCols) { + var i = 0 + val indStart = j * numRows + while (i < numRows) { + val v = values(indStart + i) + if (v != 0.0) { + rowIndices += i + spVals += v + while (j != lastCol) { + colPtrs(lastCol + 1) = nnz + lastCol += 1 + } + nnz += 1 } - nnz += 1 + i += 1 } - i += 1 + j += 1 } while (numCols > lastCol) { - sCols.append(sparseA.length) + colPtrs(lastCol + 1) = nnz lastCol += 1 } - new SparseMatrix(numRows, numCols, sCols.toArray, sRows.toArray, sparseA.toArray) + new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), spVals.result()) } } @@ -358,35 +362,30 @@ object SparseMatrix { /** * Generate a `SparseMatrix` from Coordinate List (COO) format. Input must be an array of - * (row, column, value) tuples. Array must be sorted first by *column* index and then by row - * index. + * (row, column, value) tuples. * @param numRows number of rows of the matrix * @param numCols number of columns of the matrix - * @param entries Array of ((row, column), value) tuples + * @param entries Array of (row, column, value) tuples * @return The corresponding `SparseMatrix` */ - def fromCOO(numRows: Int, numCols: Int, entries: Array[((Int, Int), Double)]): SparseMatrix = { - val colPtrs = new ArrayBuffer[Int](numCols + 1) - colPtrs.append(0) + def fromCOO(numRows: Int, numCols: Int, entries: Array[(Int, Int, Double)]): SparseMatrix = { + val sortedEntries = entries.sortBy(v => (v._2, v._1)) + val colPtrs = new Array[Int](numCols + 1) var nnz = 0 - var lastCol = 0 - val values = entries.map { case ((i, j), v) => + var lastCol = -1 + val values = sortedEntries.map { case (i, j, v) => while (j != lastCol) { - colPtrs.append(nnz) + colPtrs(lastCol + 1) = nnz lastCol += 1 - if (lastCol > numCols) { - throw new IndexOutOfBoundsException("Please make sure that the entries array is " + - "sorted by COLUMN index first and then by row index.") - } } nnz += 1 v } while (numCols > lastCol) { - colPtrs.append(nnz) + colPtrs(lastCol + 1) = nnz lastCol += 1 } - new SparseMatrix(numRows, numCols, colPtrs.toArray, entries.map(_._1._1), values) + new SparseMatrix(numRows, numCols, colPtrs.toArray, sortedEntries.map(_._1), values) } /** @@ -411,17 +410,42 @@ object SparseMatrix { val length = math.ceil(numRows * numCols * density).toInt val entries = Map[(Int, Int), Double]() var i = 0 - while (i < length) { - var rowIndex = rng.nextInt(numRows) - var colIndex = rng.nextInt(numCols) - while (entries.contains((rowIndex, colIndex))) { - rowIndex = rng.nextInt(numRows) - colIndex = rng.nextInt(numCols) + // Expected number of iterations is less than 1.5 * length + if (density < 0.34) { + while (i < length) { + var rowIndex = rng.nextInt(numRows) + var colIndex = rng.nextInt(numCols) + while (entries.contains((rowIndex, colIndex))) { + rowIndex = rng.nextInt(numRows) + colIndex = rng.nextInt(numCols) + } + entries += (rowIndex, colIndex) -> method(rng) + i += 1 + } + } else { // selection - rejection method + var j = 0 + val triesPerCol = math.ceil(length * 1.0 / numCols).toInt + val pool = numRows * numCols + // loop over columns so that the sort in fromCOO requires less sorting + while (i < length && j < numCols) { + var k = 0 + val leftFromPool = (numCols - j) * numRows + while (k < triesPerCol) { + if (rng.nextDouble() < 1.0 * (length - i) / (pool - leftFromPool)) { + var rowIndex = rng.nextInt(numRows) + val colIndex = j + while (entries.contains((rowIndex, colIndex))) { + rowIndex = rng.nextInt(numRows) + } + entries += (rowIndex, colIndex) -> method(rng) + i += 1 + } + k += 1 + } + j += 1 } - entries += (rowIndex, colIndex) -> method(rng) - i += 1 } - SparseMatrix.fromCOO(numRows, numCols, entries.toArray.sortBy(v => (v._1._2, v._1._1))) + SparseMatrix.fromCOO(numRows, numCols, entries.toArray.map(v => (v._1._1, v._1._2, v._2))) } /** @@ -462,12 +486,12 @@ object SparseMatrix { val n = vector.size vector match { case sVec: SparseVector => - val indices = sVec.indices.map(i => (i, i)) - SparseMatrix.fromCOO(n, n, indices.zip(sVec.values)) + val indices = sVec.indices + SparseMatrix.fromCOO(n, n, indices.zip(sVec.values).map(v => (v._1, v._1, v._2))) case dVec: DenseVector => val values = dVec.values.zipWithIndex val nnzVals = values.filter(v => v._1 != 0.0) - SparseMatrix.fromCOO(n, n, nnzVals.map(v => ((v._2, v._2), v._1))) + SparseMatrix.fromCOO(n, n, nnzVals.map(v => (v._2, v._2, v._1))) } } } @@ -613,21 +637,20 @@ object Matrices { * @return a single `Matrix` composed of the matrices that were horizontally concatenated */ def horzcat(matrices: Array[Matrix]): Matrix = { - if (matrices.size == 1) { - return matrices(0) - } else if (matrices.size == 0) { + if (matrices.isEmpty) { return new DenseMatrix(0, 0, Array[Double]()) + } else if (matrices.size == 1) { + return matrices(0) } val numRows = matrices(0).numRows var rowsMatch = true - var hasDense = false var hasSparse = false var numCols = 0 matrices.foreach { mat => if (numRows != mat.numRows) rowsMatch = false mat match { case sparse: SparseMatrix => hasSparse = true - case dense: DenseMatrix => hasDense = true + case dense: DenseMatrix => // empty on purpose case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " + s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}") } @@ -635,36 +658,49 @@ object Matrices { } require(rowsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (!hasSparse && hasDense) { + if (!hasSparse) { new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray)) } else { var startCol = 0 - val entries: Array[((Int, Int), Double)] = matrices.flatMap { + val entries: Array[(Int, Int, Double)] = matrices.flatMap { case spMat: SparseMatrix => var j = 0 - var cnt = 0 - val ptr = spMat.colPtrs - val data = spMat.rowIndices.zip(spMat.values).map { case (i, v) => - cnt += 1 - if (cnt <= ptr(j + 1)) { - ((i, j + startCol), v) - } else { - while (ptr(j + 1) < cnt) { - j += 1 - } - ((i, j + startCol), v) + val colPtrs = spMat.colPtrs + val rowIndices = spMat.rowIndices + val values = spMat.values + val data = new Array[(Int, Int, Double)](values.length) + val nCols = spMat.numCols + while (j < nCols) { + var idx = colPtrs(j) + while (idx < colPtrs(j + 1)) { + val i = rowIndices(idx) + val v = values(idx) + data(idx) = (i, j + startCol, v) + idx += 1 } + j += 1 } - startCol += spMat.numCols + startCol += nCols data case dnMat: DenseMatrix => - val nnzValues = dnMat.values.zipWithIndex.filter(v => v._1 != 0.0) - val data = nnzValues.map { case (v, i) => - val rowIndex = i % dnMat.numRows - val colIndex = i / dnMat.numRows - ((rowIndex, colIndex + startCol), v) + val data = new ArrayBuffer[(Int, Int, Double)]() + var j = 0 + val nCols = dnMat.numCols + val nRows = dnMat.numRows + val values = dnMat.values + while (j < nCols) { + var i = 0 + val indStart = j * nRows + while (i < nRows) { + val v = values(indStart + i) + if (v != 0.0) { + data.append((i, j + startCol, v)) + } + i += 1 + } + j += 1 } - startCol += dnMat.numCols + startCol += nCols data } SparseMatrix.fromCOO(numRows, numCols, entries) @@ -679,14 +715,13 @@ object Matrices { * @return a single `Matrix` composed of the matrices that were vertically concatenated */ def vertcat(matrices: Array[Matrix]): Matrix = { - if (matrices.size == 1) { - return matrices(0) - } else if (matrices.size == 0) { + if (matrices.isEmpty) { return new DenseMatrix(0, 0, Array[Double]()) + } else if (matrices.size == 1) { + return matrices(0) } val numCols = matrices(0).numCols var colsMatch = true - var hasDense = false var hasSparse = false var numRows = 0 var valsLength = 0 @@ -697,7 +732,6 @@ object Matrices { hasSparse = true valsLength += sparse.values.length case dense: DenseMatrix => - hasDense = true valsLength += dense.values.length case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " + s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}") @@ -707,7 +741,7 @@ object Matrices { } require(colsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (!hasSparse && hasDense) { + if (!hasSparse) { val matData = matrices.zipWithIndex.flatMap { case (mat, ind) => val values = mat.toArray for (j <- 0 until numCols) yield (j, ind, @@ -716,34 +750,46 @@ object Matrices { new DenseMatrix(numRows, numCols, matData.flatMap(_._3)) } else { var startRow = 0 - val entries: Array[((Int, Int), Double)] = matrices.flatMap { + val entries: Array[(Int, Int, Double)] = matrices.flatMap { case spMat: SparseMatrix => var j = 0 - var cnt = 0 - val ptr = spMat.colPtrs - val data = spMat.rowIndices.zip(spMat.values).map { case (i, v) => - cnt += 1 - if (cnt <= ptr(j + 1)) { - ((i + startRow, j), v) - } else { - while (ptr(j + 1) < cnt) { - j += 1 - } - ((i + startRow, j), v) + val colPtrs = spMat.colPtrs + val rowIndices = spMat.rowIndices + val values = spMat.values + val data = new Array[(Int, Int, Double)](values.length) + while (j < numCols) { + var idx = colPtrs(j) + while (idx < colPtrs(j + 1)) { + val i = rowIndices(idx) + val v = values(idx) + data(idx) = (i + startRow, j, v) + idx += 1 } + j += 1 } startRow += spMat.numRows data case dnMat: DenseMatrix => - val nnzValues = dnMat.values.zipWithIndex.filter(v => v._1 != 0.0) - val data = nnzValues.map { case (v, i) => - val rowIndex = i % dnMat.numRows - val colIndex = i / dnMat.numRows - ((rowIndex + startRow, colIndex), v) + val data = new ArrayBuffer[(Int, Int, Double)]() + var j = 0 + val nCols = dnMat.numCols + val nRows = dnMat.numRows + val values = dnMat.values + while (j < nCols) { + var i = 0 + val indStart = j * nRows + while (i < nRows) { + val v = values(indStart + i) + if (v != 0.0) { + data.append((i + startRow, j, v)) + } + i += 1 + } + j += 1 } - startRow += dnMat.numRows + startRow += nRows data - }.sortBy(d => (d._1._2, d._1._1)) + } SparseMatrix.fromCOO(numRows, numCols, entries) } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java index df811933a080b..704d484d0b585 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java @@ -99,6 +99,25 @@ public void zerosMatrixConstruction() { assertArrayEquals(done.toArray(), new double[]{1.0, 1.0, 1.0, 1.0}, 0.0); } + @Test + public void sparseDenseConversion() { + int m = 3; + int n = 2; + double[] values = new double[]{1.0, 2.0, 4.0, 5.0}; + double[] allValues = new double[]{1.0, 2.0, 0.0, 0.0, 4.0, 5.0}; + int[] colPtrs = new int[]{0, 2, 4}; + int[] rowIndices = new int[]{0, 1, 1, 2}; + + SparseMatrix spMat1 = new SparseMatrix(m, n, colPtrs, rowIndices, values); + DenseMatrix deMat1 = new DenseMatrix(m, n, allValues); + + SparseMatrix spMat2 = deMat1.toSparse(); + DenseMatrix deMat2 = spMat1.toDense(); + + assertArrayEquals(spMat1.toArray(), spMat2.toArray(), 0.0); + assertArrayEquals(deMat1.toArray(), deMat2.toArray(), 0.0); + } + @Test public void concatenateMatrices() { int m = 3; diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index f5933ddcfca2e..bdb0f9e34bfd9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -54,8 +54,8 @@ class MatricesSuite extends FunSuite { assert(mat.colPtrs.eq(colPtrs), "should not copy data") assert(mat.rowIndices.eq(rowIndices), "should not copy data") - val entries: Array[((Int, Int), Double)] = Array(((1, 0), 1.0), ((2, 0), 2.0), - ((1, 2), 4.0), ((2, 2), 5.0)) + val entries: Array[(Int, Int, Double)] = Array((1, 0, 1.0), (2, 0, 2.0), + (1, 2, 4.0), (2, 2, 5.0)) val mat2 = SparseMatrix.fromCOO(m, n, entries) assert(mat.toBreeze === mat2.toBreeze) @@ -123,6 +123,24 @@ class MatricesSuite extends FunSuite { assert(sparseMat.values(2) === 10.0) } + test("toSparse, toDense") { + val m = 3 + val n = 2 + val values = Array(1.0, 2.0, 4.0, 5.0) + val allValues = Array(1.0, 2.0, 0.0, 0.0, 4.0, 5.0) + val colPtrs = Array(0, 2, 4) + val rowIndices = Array(0, 1, 1, 2) + + val spMat1 = new SparseMatrix(m, n, colPtrs, rowIndices, values) + val deMat1 = new DenseMatrix(m, n, allValues) + + val spMat2 = deMat1.toSparse() + val deMat2 = spMat1.toDense() + + assert(spMat1.toBreeze === spMat2.toBreeze) + assert(deMat1.toBreeze === deMat2.toBreeze) + } + test("map, update") { val m = 3 val n = 2 @@ -162,6 +180,8 @@ class MatricesSuite extends FunSuite { val spHorz3 = Matrices.horzcat(Array(deMat1, spMat2)) val deHorz1 = Matrices.horzcat(Array(deMat1, deMat2)) + val deHorz2 = Matrices.horzcat(Array[Matrix]()) + assert(deHorz1.numRows === 3) assert(spHorz2.numRows === 3) assert(spHorz3.numRows === 3) @@ -170,6 +190,9 @@ class MatricesSuite extends FunSuite { assert(spHorz2.numCols === 5) assert(spHorz3.numCols === 5) assert(spHorz.numCols === 5) + assert(deHorz2.numRows === 0) + assert(deHorz2.numCols === 0) + assert(deHorz2.toArray.length === 0) assert(deHorz1.toBreeze.toDenseMatrix === spHorz2.toBreeze.toDenseMatrix) assert(spHorz2.toBreeze === spHorz3.toBreeze) @@ -200,6 +223,7 @@ class MatricesSuite extends FunSuite { val deVert1 = Matrices.vertcat(Array(deMat1, deMat3)) val spVert2 = Matrices.vertcat(Array(spMat1, deMat3)) val spVert3 = Matrices.vertcat(Array(deMat1, spMat3)) + val deVert2 = Matrices.vertcat(Array[Matrix]()) assert(deVert1.numRows === 5) assert(spVert2.numRows === 5) @@ -209,6 +233,9 @@ class MatricesSuite extends FunSuite { assert(spVert2.numCols === 2) assert(spVert3.numCols === 2) assert(spVert.numCols === 2) + assert(deVert2.numRows === 0) + assert(deVert2.numCols === 0) + assert(deVert2.toArray.length === 0) assert(deVert1.toBreeze.toDenseMatrix === spVert2.toBreeze.toDenseMatrix) assert(spVert2.toBreeze === spVert3.toBreeze) From f62d6c795c6293817df195369e2873eb97b11a0e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 19 Dec 2014 02:42:43 -0800 Subject: [PATCH 12/17] [SPARK-4409] Modified genRandMatrix --- .../apache/spark/mllib/linalg/Matrices.scala | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 9d2611e0ed8e9..e4f90f3171b0b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg import java.util.{Arrays, Random} -import scala.collection.mutable.{ArrayBuffer, ArrayBuilder, Map} +import scala.collection.mutable.{ArrayBuffer, ArrayBuilder, Map => MutableMap} import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM} @@ -408,8 +408,15 @@ object SparseMatrix { require(density >= 0.0 && density <= 1.0, "density must be a double in the range " + s"0.0 <= d <= 1.0. Currently, density: $density") val length = math.ceil(numRows * numCols * density).toInt - val entries = Map[(Int, Int), Double]() + val entries = MutableMap[(Int, Int), Double]() var i = 0 + if (density == 0.0) { + return new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), + Array[Int](), Array[Double]()) + } else if (density == 1.0) { + return new SparseMatrix(numRows, numCols, (0 to numRows * numCols by numRows).toArray, + (0 until numRows * numCols).toArray, Array.fill(numRows * numCols)(method(rng))) + } // Expected number of iterations is less than 1.5 * length if (density < 0.34) { while (i < length) { @@ -424,23 +431,18 @@ object SparseMatrix { } } else { // selection - rejection method var j = 0 - val triesPerCol = math.ceil(length * 1.0 / numCols).toInt val pool = numRows * numCols // loop over columns so that the sort in fromCOO requires less sorting while (i < length && j < numCols) { - var k = 0 - val leftFromPool = (numCols - j) * numRows - while (k < triesPerCol) { - if (rng.nextDouble() < 1.0 * (length - i) / (pool - leftFromPool)) { - var rowIndex = rng.nextInt(numRows) - val colIndex = j - while (entries.contains((rowIndex, colIndex))) { - rowIndex = rng.nextInt(numRows) - } - entries += (rowIndex, colIndex) -> method(rng) + var passedInPool = j * numRows + var r = 0 + while (i < length && r < numRows) { + if (rng.nextDouble() < 1.0 * (length - i) / (pool - passedInPool)) { + entries += (r, j) -> method(rng) i += 1 } - k += 1 + r += 1 + passedInPool += 1 } j += 1 } From 10a63a6e6b583e6e79ced58ea9b73937656f5a24 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sat, 20 Dec 2014 00:43:52 -0800 Subject: [PATCH 13/17] [SPARK-4409] Fourth pass of code review --- .../apache/spark/mllib/linalg/Matrices.scala | 145 ++++++++++-------- .../spark/mllib/linalg/MatricesSuite.scala | 14 +- 2 files changed, 92 insertions(+), 67 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index e4f90f3171b0b..4a6cfde0634cf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg import java.util.{Arrays, Random} -import scala.collection.mutable.{ArrayBuffer, ArrayBuilder, Map => MutableMap} +import scala.collection.mutable.{ArrayBuilder => MArrayBuilder, HashSet => MHashSet, ArrayBuffer} import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM} @@ -150,11 +150,10 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) /** Generate a `SparseMatrix` from the given `DenseMatrix`. */ def toSparse(): SparseMatrix = { - val spVals: ArrayBuilder[Double] = new ArrayBuilder.ofDouble + val spVals: MArrayBuilder[Double] = new MArrayBuilder.ofDouble val colPtrs: Array[Int] = new Array[Int](numCols + 1) - val rowIndices: ArrayBuilder[Int] = new ArrayBuilder.ofInt + val rowIndices: MArrayBuilder[Int] = new MArrayBuilder.ofInt var nnz = 0 - var lastCol = -1 var j = 0 while (j < numCols) { var i = 0 @@ -164,19 +163,12 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double]) if (v != 0.0) { rowIndices += i spVals += v - while (j != lastCol) { - colPtrs(lastCol + 1) = nnz - lastCol += 1 - } nnz += 1 } i += 1 } j += 1 - } - while (numCols > lastCol) { - colPtrs(lastCol + 1) = nnz - lastCol += 1 + colPtrs(j) = nnz } new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), spVals.result()) } @@ -362,10 +354,11 @@ object SparseMatrix { /** * Generate a `SparseMatrix` from Coordinate List (COO) format. Input must be an array of - * (row, column, value) tuples. + * (i, j, value) tuples. Entries that have duplicate values of i and j are + * added together. Tuples where value is equal to zero will be omitted. * @param numRows number of rows of the matrix * @param numCols number of columns of the matrix - * @param entries Array of (row, column, value) tuples + * @param entries Array of (i, j, value) tuples * @return The corresponding `SparseMatrix` */ def fromCOO(numRows: Int, numCols: Int, entries: Array[(Int, Int, Double)]): SparseMatrix = { @@ -373,19 +366,42 @@ object SparseMatrix { val colPtrs = new Array[Int](numCols + 1) var nnz = 0 var lastCol = -1 - val values = sortedEntries.map { case (i, j, v) => - while (j != lastCol) { - colPtrs(lastCol + 1) = nnz - lastCol += 1 + var lastIndex = -1 + sortedEntries.foreach { case (i, j, v) => + require(i >= 0 && j >= 0, "Negative indices given. Please make sure all indices are " + + s"greater than or equal to zero. i: $i, j: $j, value: $v") + if (v != 0.0) { + while (j != lastCol) { + colPtrs(lastCol + 1) = nnz + lastCol += 1 + } + val index = j * numRows + i + if (lastIndex != index) { + nnz += 1 + lastIndex = index + } } - nnz += 1 - v } while (numCols > lastCol) { colPtrs(lastCol + 1) = nnz lastCol += 1 } - new SparseMatrix(numRows, numCols, colPtrs.toArray, sortedEntries.map(_._1), values) + val values = new Array[Double](nnz) + val rowIndices = new Array[Int](nnz) + lastIndex = -1 + var cnt = -1 + sortedEntries.foreach { case (i, j, v) => + if (v != 0.0) { + val index = j * numRows + i + if (lastIndex != index) { + cnt += 1 + lastIndex = index + } + values(cnt) += v + rowIndices(cnt) = i + } + } + new SparseMatrix(numRows, numCols, colPtrs.toArray, rowIndices, values) } /** @@ -397,57 +413,54 @@ object SparseMatrix { new SparseMatrix(n, n, (0 to n).toArray, (0 until n).toArray, Array.fill(n)(1.0)) } - /** Generates a `SparseMatrix` with a given random number generator and `method`, which - * specifies the distribution. */ + /** Generates the skeleton of a random `SparseMatrix` with a given random number generator. */ private def genRandMatrix( numRows: Int, numCols: Int, density: Double, - rng: Random, - method: Random => Double): SparseMatrix = { + rng: Random): SparseMatrix = { require(density >= 0.0 && density <= 1.0, "density must be a double in the range " + s"0.0 <= d <= 1.0. Currently, density: $density") val length = math.ceil(numRows * numCols * density).toInt - val entries = MutableMap[(Int, Int), Double]() var i = 0 if (density == 0.0) { return new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array[Int](), Array[Double]()) } else if (density == 1.0) { + val rowIndices = Array.tabulate(numCols, numRows)((j, i) => i).flatten return new SparseMatrix(numRows, numCols, (0 to numRows * numCols by numRows).toArray, - (0 until numRows * numCols).toArray, Array.fill(numRows * numCols)(method(rng))) + rowIndices, new Array[Double](numRows * numCols)) } - // Expected number of iterations is less than 1.5 * length - if (density < 0.34) { - while (i < length) { - var rowIndex = rng.nextInt(numRows) - var colIndex = rng.nextInt(numCols) - while (entries.contains((rowIndex, colIndex))) { - rowIndex = rng.nextInt(numRows) - colIndex = rng.nextInt(numCols) - } - entries += (rowIndex, colIndex) -> method(rng) - i += 1 + if (density < 0.34) { // Expected number of iterations is less than 1.5 * length + val entries = MHashSet[(Int, Int)]() + while (entries.size < length) { + entries += ((rng.nextInt(numRows), rng.nextInt(numCols))) } + val entryList = entries.toArray.map(v => (v._1, v._2, 1.0)) + SparseMatrix.fromCOO(numRows, numCols, entryList) } else { // selection - rejection method var j = 0 val pool = numRows * numCols - // loop over columns so that the sort in fromCOO requires less sorting + val rowIndexBuilder = new MArrayBuilder.ofInt + val colPtrs = new Array[Int](numCols + 1) while (i < length && j < numCols) { var passedInPool = j * numRows var r = 0 while (i < length && r < numRows) { if (rng.nextDouble() < 1.0 * (length - i) / (pool - passedInPool)) { - entries += (r, j) -> method(rng) + rowIndexBuilder += r i += 1 } r += 1 passedInPool += 1 } j += 1 + colPtrs(j) = i } + val rowIndices = rowIndexBuilder.result() + new SparseMatrix(numRows, numCols, colPtrs, rowIndices, new Array[Double](rowIndices.size)) } - SparseMatrix.fromCOO(numRows, numCols, entries.toArray.map(v => (v._1._1, v._1._2, v._2))) + } /** @@ -461,8 +474,8 @@ object SparseMatrix { * @return `SparseMatrix` with size `numRows` x `numCols` and values in U(0, 1) */ def sprand(numRows: Int, numCols: Int, density: Double, rng: Random): SparseMatrix = { - def method(rand: Random): Double = rand.nextDouble() - genRandMatrix(numRows, numCols, density, rng, method) + val mat = genRandMatrix(numRows, numCols, density, rng) + mat.update(i => rng.nextDouble()) } /** @@ -474,8 +487,8 @@ object SparseMatrix { * @return `SparseMatrix` with size `numRows` x `numCols` and values in N(0, 1) */ def sprandn(numRows: Int, numCols: Int, density: Double, rng: Random): SparseMatrix = { - def method(rand: Random): Double = rand.nextGaussian() - genRandMatrix(numRows, numCols, density, rng, method) + val mat = genRandMatrix(numRows, numCols, density, rng) + mat.update(i => rng.nextGaussian()) } /** @@ -488,11 +501,10 @@ object SparseMatrix { val n = vector.size vector match { case sVec: SparseVector => - val indices = sVec.indices - SparseMatrix.fromCOO(n, n, indices.zip(sVec.values).map(v => (v._1, v._1, v._2))) + SparseMatrix.fromCOO(n, n, sVec.indices.zip(sVec.values).map(v => (v._1, v._1, v._2))) case dVec: DenseVector => - val values = dVec.values.zipWithIndex - val nnzVals = values.filter(v => v._1 != 0.0) + val entries = dVec.values.zipWithIndex + val nnzVals = entries.filter(v => v._1 != 0.0) SparseMatrix.fromCOO(n, n, nnzVals.map(v => (v._2, v._2, v._1))) } } @@ -645,11 +657,11 @@ object Matrices { return matrices(0) } val numRows = matrices(0).numRows - var rowsMatch = true var hasSparse = false var numCols = 0 matrices.foreach { mat => - if (numRows != mat.numRows) rowsMatch = false + require(numRows == mat.numRows, "The number of rows of the matrices in this sequence, " + + "don't match!") mat match { case sparse: SparseMatrix => hasSparse = true case dense: DenseMatrix => // empty on purpose @@ -658,8 +670,6 @@ object Matrices { } numCols += mat.numCols } - require(rowsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (!hasSparse) { new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray)) } else { @@ -723,12 +733,12 @@ object Matrices { return matrices(0) } val numCols = matrices(0).numCols - var colsMatch = true var hasSparse = false var numRows = 0 var valsLength = 0 matrices.foreach { mat => - if (numCols != mat.numCols) colsMatch = false + require(numCols == mat.numCols, "The number of rows of the matrices in this sequence, " + + "don't match!") mat match { case sparse: SparseMatrix => hasSparse = true @@ -741,15 +751,26 @@ object Matrices { numRows += mat.numRows } - require(colsMatch, "The number of rows of the matrices in this sequence, don't match!") - if (!hasSparse) { - val matData = matrices.zipWithIndex.flatMap { case (mat, ind) => + val allValues = new Array[Double](numRows * numCols) + var startRow = 0 + matrices.foreach { mat => + var j = 0 + val nRows = mat.numRows val values = mat.toArray - for (j <- 0 until numCols) yield (j, ind, - values.slice(j * mat.numRows, (j + 1) * mat.numRows)) - }.sortBy(x => (x._1, x._2)) - new DenseMatrix(numRows, numCols, matData.flatMap(_._3)) + while (j < numCols) { + var i = 0 + val indStart = j * numRows + startRow + val subMatStart = j * nRows + while (i < nRows) { + allValues(indStart + i) = values(subMatStart + i) + i += 1 + } + j += 1 + } + startRow += nRows + } + new DenseMatrix(numRows, numCols, allValues) } else { var startRow = 0 val entries: Array[(Int, Int, Double)] = matrices.flatMap { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index bdb0f9e34bfd9..a35d0fe389fdd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -54,11 +54,12 @@ class MatricesSuite extends FunSuite { assert(mat.colPtrs.eq(colPtrs), "should not copy data") assert(mat.rowIndices.eq(rowIndices), "should not copy data") - val entries: Array[(Int, Int, Double)] = Array((1, 0, 1.0), (2, 0, 2.0), - (1, 2, 4.0), (2, 2, 5.0)) + val entries: Array[(Int, Int, Double)] = Array((2, 2, 3.0), (1, 0, 1.0), (2, 0, 2.0), + (1, 2, 2.0), (2, 2, 2.0), (1, 2, 2.0), (0, 0, 0.0)) val mat2 = SparseMatrix.fromCOO(m, n, entries) assert(mat.toBreeze === mat2.toBreeze) + assert(mat2.values.length == 4) } test("sparse matrix construction with wrong number of elements") { @@ -308,12 +309,15 @@ class MatricesSuite extends FunSuite { test("sprand") { val rng = mock[Random] when(rng.nextInt(4)).thenReturn(0, 1, 1, 3, 2, 2, 0, 1, 3, 0) - when(rng.nextDouble()).thenReturn(1.0, 2.0, 3.0, 4.0) + when(rng.nextDouble()).thenReturn(1.0, 2.0, 3.0, 4.0, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0) val mat = SparseMatrix.sprand(4, 4, 0.25, rng) assert(mat.numRows === 4) assert(mat.numCols === 4) assert(mat.rowIndices.toSeq === Seq(3, 0, 2, 1)) - assert(mat.values.toSeq === Seq(4.0, 1.0, 3.0, 2.0)) + assert(mat.values.toSeq === Seq(1.0, 2.0, 3.0, 4.0)) + val mat2 = SparseMatrix.sprand(2, 3, 1.0, rng) + assert(mat2.rowIndices.toSeq === Seq(0, 1, 0, 1, 0, 1)) + assert(mat2.colPtrs.toSeq === Seq(0, 2, 4, 6)) } test("sprandn") { @@ -324,6 +328,6 @@ class MatricesSuite extends FunSuite { assert(mat.numRows === 4) assert(mat.numCols === 4) assert(mat.rowIndices.toSeq === Seq(3, 0, 2, 1)) - assert(mat.values.toSeq === Seq(4.0, 1.0, 3.0, 2.0)) + assert(mat.values.toSeq === Seq(1.0, 2.0, 3.0, 4.0)) } } From 4e95e24d46453377cf17cc257db1e1cf5d920a69 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 23 Dec 2014 12:56:33 -0800 Subject: [PATCH 14/17] simplify fromCOO implementation --- .../apache/spark/mllib/linalg/Matrices.scala | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 4a6cfde0634cf..076aeaf5a987c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -362,46 +362,46 @@ object SparseMatrix { * @return The corresponding `SparseMatrix` */ def fromCOO(numRows: Int, numCols: Int, entries: Array[(Int, Int, Double)]): SparseMatrix = { + val numEntries = entries.size val sortedEntries = entries.sortBy(v => (v._2, v._1)) - val colPtrs = new Array[Int](numCols + 1) - var nnz = 0 - var lastCol = -1 - var lastIndex = -1 - sortedEntries.foreach { case (i, j, v) => - require(i >= 0 && j >= 0, "Negative indices given. Please make sure all indices are " + - s"greater than or equal to zero. i: $i, j: $j, value: $v") - if (v != 0.0) { - while (j != lastCol) { - colPtrs(lastCol + 1) = nnz - lastCol += 1 - } - val index = j * numRows + i - if (lastIndex != index) { - nnz += 1 - lastIndex = index - } + if (sortedEntries.nonEmpty) { + // Since the entries are sorted by column index, we only need to check the first and the last. + for (col <- Seq(sortedEntries.head._2, sortedEntries.last._2)) { + require(col >= 0 && col < numCols, s"Column index out of range [0, $numCols): $col.") } } - while (numCols > lastCol) { - colPtrs(lastCol + 1) = nnz - lastCol += 1 - } - val values = new Array[Double](nnz) - val rowIndices = new Array[Int](nnz) - lastIndex = -1 - var cnt = -1 - sortedEntries.foreach { case (i, j, v) => - if (v != 0.0) { - val index = j * numRows + i - if (lastIndex != index) { - cnt += 1 - lastIndex = index + val colPtrs = new Array[Int](numCols + 1) + val rowIndices = MArrayBuilder.make[Int] + rowIndices.sizeHint(numEntries) + val values = MArrayBuilder.make[Double] + values.sizeHint(numEntries) + var nnz = 0 + var prevCol = 0 + var prevRow = -1 + var prevVal = 0.0 + // Append a dummy entry to include the last one at the end of the loop. + (sortedEntries.view :+ (numRows, numCols, 1.0)).foreach { case (i, j, v) => + if (v != 0) { + if (i == prevRow && j == prevCol) { + prevVal += v + } else { + if (prevVal != 0) { + require(prevRow >= 0 && prevRow < numRows, + s"Row index out of range [0, $numRows): $prevRow.") + nnz += 1 + rowIndices += prevRow + values += prevVal + } + prevRow = i + prevVal = v + while (prevCol < j) { + colPtrs(prevCol + 1) = nnz + prevCol += 1 + } } - values(cnt) += v - rowIndices(cnt) = i } } - new SparseMatrix(numRows, numCols, colPtrs.toArray, rowIndices, values) + new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), values.result()) } /** From ecc937a1223dbca73a29e30e3b593edbfa341229 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 23 Dec 2014 13:44:14 -0800 Subject: [PATCH 15/17] update sprand --- .../apache/spark/mllib/linalg/Matrices.scala | 73 ++++++++++--------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 076aeaf5a987c..82268596b8eb1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -361,9 +361,9 @@ object SparseMatrix { * @param entries Array of (i, j, value) tuples * @return The corresponding `SparseMatrix` */ - def fromCOO(numRows: Int, numCols: Int, entries: Array[(Int, Int, Double)]): SparseMatrix = { - val numEntries = entries.size - val sortedEntries = entries.sortBy(v => (v._2, v._1)) + def fromCOO(numRows: Int, numCols: Int, entries: Iterable[(Int, Int, Double)]): SparseMatrix = { + val sortedEntries = entries.toSeq.sortBy(v => (v._2, v._1)) + val numEntries = sortedEntries.size if (sortedEntries.nonEmpty) { // Since the entries are sorted by column index, we only need to check the first and the last. for (col <- Seq(sortedEntries.head._2, sortedEntries.last._2)) { @@ -413,54 +413,59 @@ object SparseMatrix { new SparseMatrix(n, n, (0 to n).toArray, (0 until n).toArray, Array.fill(n)(1.0)) } - /** Generates the skeleton of a random `SparseMatrix` with a given random number generator. */ + /** + * Generates the skeleton of a random `SparseMatrix` with a given random number generator. + * The values of the matrix returned are undefined. + */ private def genRandMatrix( numRows: Int, numCols: Int, density: Double, rng: Random): SparseMatrix = { - require(density >= 0.0 && density <= 1.0, "density must be a double in the range " + - s"0.0 <= d <= 1.0. Currently, density: $density") - val length = math.ceil(numRows * numCols * density).toInt - var i = 0 + require(numRows > 0, s"numRows must be greater than 0 but got $numRows") + require(numCols > 0, s"numCols must be greater than 0 but got $numCols") + require(density >= 0.0 && density <= 1.0, + s"density must be a double in the range 0.0 <= d <= 1.0. Currently, density: $density") + val size = numRows.toLong * numCols + val expected = size * density + assert(expected < Int.MaxValue, + "The expected number of nonzeros cannot be greater than Int.MaxValue.") + val nnz = math.ceil(expected).toInt if (density == 0.0) { - return new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), - Array[Int](), Array[Double]()) + new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array[Int](), Array[Double]()) } else if (density == 1.0) { - val rowIndices = Array.tabulate(numCols, numRows)((j, i) => i).flatten - return new SparseMatrix(numRows, numCols, (0 to numRows * numCols by numRows).toArray, - rowIndices, new Array[Double](numRows * numCols)) - } - if (density < 0.34) { // Expected number of iterations is less than 1.5 * length + val colPtrs = Array.tabulate(numCols + 1)(j => j * numRows) + val rowIndices = Array.tabulate(size.toInt)(idx => idx % numRows) + new SparseMatrix(numRows, numCols, colPtrs, rowIndices, new Array[Double](numRows * numCols)) + } else if (density < 0.34) { + // draw-by-draw, expected number of iterations is less than 1.5 * nnz val entries = MHashSet[(Int, Int)]() - while (entries.size < length) { + while (entries.size < nnz) { entries += ((rng.nextInt(numRows), rng.nextInt(numCols))) } - val entryList = entries.toArray.map(v => (v._1, v._2, 1.0)) - SparseMatrix.fromCOO(numRows, numCols, entryList) - } else { // selection - rejection method + SparseMatrix.fromCOO(numRows, numCols, entries.map(v => (v._1, v._2, 1.0))) + } else { + // selection-rejection method + var idx = 0L + var numSelected = 0 + var i = 0 var j = 0 - val pool = numRows * numCols - val rowIndexBuilder = new MArrayBuilder.ofInt val colPtrs = new Array[Int](numCols + 1) - while (i < length && j < numCols) { - var passedInPool = j * numRows - var r = 0 - while (i < length && r < numRows) { - if (rng.nextDouble() < 1.0 * (length - i) / (pool - passedInPool)) { - rowIndexBuilder += r - i += 1 + val rowIndices = new Array[Int](nnz) + while (j < numCols && numSelected < nnz) { + while (i < numRows && numSelected < nnz) { + if (rng.nextDouble() < 1.0 * (nnz - numSelected) / (size - idx)) { + rowIndices(numSelected) = i + numSelected += 1 } - r += 1 - passedInPool += 1 + i += 1 + idx += 1 } + colPtrs(j + 1) = numSelected j += 1 - colPtrs(j) = i } - val rowIndices = rowIndexBuilder.result() - new SparseMatrix(numRows, numCols, colPtrs, rowIndices, new Array[Double](rowIndices.size)) + new SparseMatrix(numRows, numCols, colPtrs, rowIndices, new Array[Double](nnz)) } - } /** From 80cfa2948405cdc0109ead06ef1338ddf433f58a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 23 Dec 2014 13:52:08 -0800 Subject: [PATCH 16/17] minor changes --- .../main/scala/org/apache/spark/mllib/linalg/Matrices.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 82268596b8eb1..733112c1fcc86 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -740,16 +740,13 @@ object Matrices { val numCols = matrices(0).numCols var hasSparse = false var numRows = 0 - var valsLength = 0 matrices.foreach { mat => require(numCols == mat.numCols, "The number of rows of the matrices in this sequence, " + "don't match!") mat match { case sparse: SparseMatrix => hasSparse = true - valsLength += sparse.values.length case dense: DenseMatrix => - valsLength += dense.values.length case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " + s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}") } From b0354f616f7f49ee9b19f6b8e5d0dc775b05dba2 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 23 Dec 2014 23:33:51 -0800 Subject: [PATCH 17/17] [SPARK-4409] Incorporated mengxr's code --- .../src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 733112c1fcc86..5a7281ec6dc3c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -448,11 +448,11 @@ object SparseMatrix { // selection-rejection method var idx = 0L var numSelected = 0 - var i = 0 var j = 0 val colPtrs = new Array[Int](numCols + 1) val rowIndices = new Array[Int](nnz) while (j < numCols && numSelected < nnz) { + var i = 0 while (i < numRows && numSelected < nnz) { if (rng.nextDouble() < 1.0 * (nnz - numSelected) / (size - idx)) { rowIndices(numSelected) = i