Skip to content

Commit 39b0b22

Browse files
committed
initial draft for discussion
1 parent fc8feaa commit 39b0b22

File tree

1 file changed

+59
-0
lines changed
  • mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed

1 file changed

+59
-0
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,65 @@ class RowMatrix(
497497
columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma)
498498
}
499499

500+
/**
501+
* Compute QR decomposition for rowMatrix. The implementation is designed to optimize the QR
502+
* decomposition (factorizations) for the RowMatrix of a tall and skinny shape, yet it applies
503+
* to RowMatrix in general.
504+
*
505+
* Reference:
506+
* Austin R. Benson, David F. Gleich, James Demmel. "Direct QR factorizations for tall-and
507+
* -skinny matrices in MapReduce architectures", 2013 IEEE International Conference on Big Data
508+
* @param computeQ: whether to computeQ, which is quite expensive.
509+
* @return the decomposition result as (Option[Q], R), where Q is a RowMatrix and R is Matrix.
510+
*/
511+
def TSQR(computeQ: Boolean = false): (Option[RowMatrix], Matrix) = {
512+
val col = numCols().toInt
513+
514+
// split rows horizontally into smaller matrices, and compute QR for each of them
515+
val blockQRs = rows.mapPartitions(rowsIterator =>{
516+
val partRows = rowsIterator.toArray
517+
val rowCount = partRows.size
518+
var bdm = BDM.zeros[Double](partRows.size, col)
519+
var i = 0
520+
partRows.foreach(row =>{
521+
bdm(i, ::) := row.toBreeze.t
522+
i += 1
523+
})
524+
525+
val blockQR = breeze.linalg.qr.reduced(bdm)
526+
Iterator((blockQR.r, blockQR.q))
527+
}).cache
528+
529+
// combine the R part from previous results horizontally into a tall matrix
530+
val blockRsRdd = blockQRs.map(_._1).collect()
531+
val CombinedR = blockRsRdd.reduceLeft((r1, r2) => BDM.vertcat(r1, r2))
532+
533+
val CombinedRDecomposition = breeze.linalg.qr.reduced(CombinedR)
534+
val finalR = Matrices.fromBreeze(CombinedRDecomposition.r.toDenseMatrix)
535+
536+
val finalQ = if(computeQ){
537+
val blockQ = blockQRs.map(_._2)
538+
val rightPartQ = CombinedRDecomposition.q
539+
val rightQArray = (0 until blockQ.count().toInt)
540+
.map(i => rightPartQ(i * col until (i + 1) * col, ::))
541+
.toArray
542+
val rightQrdd = blockQ.context.parallelize(rightQArray)
543+
544+
val qProducts = blockQ.zip(rightQrdd).map(m => m._1 * m._2)
545+
val newRows = qProducts.flatMap(m => {
546+
val row = m.rows
547+
(0 until row).map(i =>{
548+
val bv = m(i, ::).t
549+
Vectors.fromBreeze(bv)
550+
})
551+
})
552+
Some(new RowMatrix(newRows))
553+
}
554+
else None
555+
556+
(finalQ, finalR)
557+
}
558+
500559
/**
501560
* Find all similar columns using the DIMSUM sampling algorithm, described in two papers
502561
*

0 commit comments

Comments
 (0)