@@ -20,97 +20,51 @@ package org.apache.spark.mllib.linalg.distributed
2020import breeze .linalg .{DenseMatrix => BDM }
2121
2222import org .apache .spark ._
23- import org .apache .spark .mllib .linalg .DenseMatrix
23+ import org .apache .spark .mllib .linalg ._
2424import org .apache .spark .mllib .rdd .RDDFunctions ._
2525import org .apache .spark .rdd .RDD
2626import org .apache .spark .storage .StorageLevel
2727import org .apache .spark .util .Utils
2828
2929/**
30- * Represents a local matrix that makes up one block of a distributed BlockMatrix
31- *
32- * @param blockRowIndex The row index of this block. Must be zero based.
33- * @param blockColIndex The column index of this block. Must be zero based.
34- * @param mat The underlying local matrix
35- */
36- case class SubMatrix (blockRowIndex : Int , blockColIndex : Int , mat : DenseMatrix ) extends Serializable
37-
38- /**
39- * A partitioner that decides how the matrix is distributed in the cluster
30+ * A grid partitioner, which stores every block in a separate partition.
4031 *
41- * @param numPartitions Number of partitions
32+ * @param numRowBlocks Number of blocks that form the rows of the matrix.
33+ * @param numColBlocks Number of blocks that form the columns of the matrix.
4234 * @param rowPerBlock Number of rows that make up each block.
4335 * @param colPerBlock Number of columns that make up each block.
4436 */
45- private [mllib] abstract class BlockMatrixPartitioner (
46- override val numPartitions : Int ,
37+ private [mllib] class GridPartitioner (
38+ val numRowBlocks : Int ,
39+ val numColBlocks : Int ,
4740 val rowPerBlock : Int ,
48- val colPerBlock : Int ) extends Partitioner {
49- val name : String
41+ val colPerBlock : Int ,
42+ override val numPartitions : Int ) extends Partitioner {
5043
5144 /**
5245 * Returns the index of the partition the SubMatrix belongs to.
5346 *
54- * @param key The key for the SubMatrix. Can be its row index, column index or position in the
55- * grid.
47+ * @param key The key for the SubMatrix. Can be its position in the grid (its column major index)
48+ * or a tuple of three integers that are the final row index after the multiplication,
49+ * the index of the block to multiply with, and the final column index after the
50+ * multiplication.
5651 * @return The index of the partition, which the SubMatrix belongs to.
5752 */
5853 override def getPartition (key : Any ): Int = {
59- Utils .nonNegativeMod(key.asInstanceOf [Int ], numPartitions)
60- }
61- }
62-
63- /**
64- * A grid partitioner, which stores every block in a separate partition.
65- *
66- * @param numRowBlocks Number of blocks that form the rows of the matrix.
67- * @param numColBlocks Number of blocks that form the columns of the matrix.
68- * @param rowPerBlock Number of rows that make up each block.
69- * @param colPerBlock Number of columns that make up each block.
70- */
71- class GridPartitioner (
72- val numRowBlocks : Int ,
73- val numColBlocks : Int ,
74- override val rowPerBlock : Int ,
75- override val colPerBlock : Int )
76- extends BlockMatrixPartitioner (numRowBlocks * numColBlocks, rowPerBlock, colPerBlock) {
77-
78- override val name = " grid"
79-
80- override val numPartitions = numRowBlocks * numColBlocks
81-
82- /** Checks whether the partitioners have the same characteristics */
83- override def equals (obj : Any ): Boolean = {
84- obj match {
85- case r : GridPartitioner =>
86- (this .numPartitions == r.numPartitions) && (this .rowPerBlock == r.rowPerBlock) &&
87- (this .colPerBlock == r.colPerBlock)
54+ key match {
55+ case ind : (Int , Int ) =>
56+ Utils .nonNegativeMod(ind._1 + ind._2 * numRowBlocks, numPartitions)
57+ case indices : (Int , Int , Int ) =>
58+ Utils .nonNegativeMod(indices._1 + indices._3 * numRowBlocks, numPartitions)
8859 case _ =>
89- false
60+ throw new IllegalArgumentException ( " Unrecognized key " )
9061 }
9162 }
92- }
93-
94- /**
95- * A specialized partitioner that stores all blocks in the same row in just one partition.
96- *
97- * @param numPartitions Number of partitions. Should be set as the number of blocks that form
98- * the rows of the matrix.
99- * @param rowPerBlock Number of rows that make up each block.
100- * @param colPerBlock Number of columns that make up each block.
101- */
102- class RowBasedPartitioner (
103- override val numPartitions : Int ,
104- override val rowPerBlock : Int ,
105- override val colPerBlock : Int )
106- extends BlockMatrixPartitioner (numPartitions, rowPerBlock, colPerBlock) {
107-
108- override val name = " row"
10963
11064 /** Checks whether the partitioners have the same characteristics */
11165 override def equals (obj : Any ): Boolean = {
11266 obj match {
113- case r : RowBasedPartitioner =>
67+ case r : GridPartitioner =>
11468 (this .numPartitions == r.numPartitions) && (this .rowPerBlock == r.rowPerBlock) &&
11569 (this .colPerBlock == r.colPerBlock)
11670 case _ =>
@@ -119,36 +73,6 @@ class RowBasedPartitioner(
11973 }
12074}
12175
122- /**
123- * A specialized partitioner that stores all blocks in the same column in just one partition.
124- *
125- * @param numPartitions Number of partitions. Should be set as the number of blocks that form
126- * the columns of the matrix.
127- * @param rowPerBlock Number of rows that make up each block.
128- * @param colPerBlock Number of columns that make up each block.
129- */
130- class ColumnBasedPartitioner (
131- override val numPartitions : Int ,
132- override val rowPerBlock : Int ,
133- override val colPerBlock : Int )
134- extends BlockMatrixPartitioner (numPartitions, rowPerBlock, colPerBlock) {
135-
136- override val name = " column"
137-
138- /** Checks whether the partitioners have the same characteristics */
139- override def equals (obj : Any ): Boolean = {
140- obj match {
141- case p : ColumnBasedPartitioner =>
142- (this .numPartitions == p.numPartitions) && (this .rowPerBlock == p.rowPerBlock) &&
143- (this .colPerBlock == p.colPerBlock)
144- case r : RowBasedPartitioner =>
145- (this .numPartitions == r.numPartitions) && (this .colPerBlock == r.rowPerBlock)
146- case _ =>
147- false
148- }
149- }
150- }
151-
15276/**
15377 * Represents a distributed matrix in blocks of local matrices.
15478 *
@@ -159,7 +83,9 @@ class ColumnBasedPartitioner(
15983class BlockMatrix (
16084 val numRowBlocks : Int ,
16185 val numColBlocks : Int ,
162- val rdd : RDD [SubMatrix ]) extends DistributedMatrix with Logging {
86+ val rdd : RDD [((Int , Int ), Matrix )]) extends DistributedMatrix with Logging {
87+
88+ type SubMatrix = ((Int , Int ), Matrix ) // ((blockRowIndex, blockColIndex), matrix)
16389
16490 /**
16591 * Alternate constructor for BlockMatrix without the input of a partitioner. Will use a Grid
@@ -168,125 +94,92 @@ class BlockMatrix(
16894 * @param numRowBlocks Number of blocks that form the rows of this matrix
16995 * @param numColBlocks Number of blocks that form the columns of this matrix
17096 * @param rdd The RDD of SubMatrices (local matrices) that form this matrix
171- * @param partitioner A partitioner that specifies how SubMatrices are stored in the cluster
97+ * @param rowPerBlock Number of rows that make up each block.
98+ * @param colPerBlock Number of columns that make up each block.
17299 */
173100 def this (
174101 numRowBlocks : Int ,
175102 numColBlocks : Int ,
176- rdd : RDD [SubMatrix ],
177- partitioner : BlockMatrixPartitioner ) = {
103+ rdd : RDD [((Int , Int ), Matrix )],
104+ rowPerBlock : Int ,
105+ colPerBlock : Int ) = {
178106 this (numRowBlocks, numColBlocks, rdd)
179- setPartitioner(partitioner)
107+ val part = new GridPartitioner (numRowBlocks, numColBlocks, rowPerBlock, colPerBlock, rdd.partitions.length)
108+ setPartitioner(part)
180109 }
181110
182- private [mllib] var partitioner : BlockMatrixPartitioner = {
183- val firstSubMatrix = rdd.first().mat
111+ private [mllib] var partitioner : GridPartitioner = {
112+ val firstSubMatrix = rdd.first()._2
184113 new GridPartitioner (numRowBlocks, numColBlocks,
185- firstSubMatrix.numRows, firstSubMatrix.numCols)
114+ firstSubMatrix.numRows, firstSubMatrix.numCols, rdd.partitions.length )
186115 }
187116
188117 /**
189118 * Set the partitioner for the matrix. For internal use only. Users should use `repartition`.
190119 * @param part A partitioner that specifies how SubMatrices are stored in the cluster
191120 */
192- private def setPartitioner (part : BlockMatrixPartitioner ): Unit = {
121+ private def setPartitioner (part : GridPartitioner ): Unit = {
193122 partitioner = part
194123 }
195124
196- // A key-value pair RDD is required to partition properly
197- private var matrixRDD : RDD [(Int , SubMatrix )] = keyBy()
198-
199125 private lazy val dims : (Long , Long ) = getDim
200126
201127 override def numRows (): Long = dims._1
202128 override def numCols (): Long = dims._2
203129
204- if (partitioner.name.equals(" column" )) {
205- require(numColBlocks == partitioner.numPartitions, " The number of column blocks should match" +
206- s " the number of partitions of the column partitioner. numColBlocks: $numColBlocks, " +
207- s " partitioner.numPartitions: ${partitioner.numPartitions}" )
208- } else if (partitioner.name.equals(" row" )) {
209- require(numRowBlocks == partitioner.numPartitions, " The number of row blocks should match" +
210- s " the number of partitions of the row partitioner. numRowBlocks: $numRowBlocks, " +
211- s " partitioner.numPartitions: ${partitioner.numPartitions}" )
212- } else if (partitioner.name.equals(" grid" )) {
213- require(numRowBlocks * numColBlocks == partitioner.numPartitions, " The number of blocks " +
214- s " should match the number of partitions of the grid partitioner. numRowBlocks * " +
215- s " numColBlocks: ${numRowBlocks * numColBlocks}, " +
216- s " partitioner.numPartitions: ${partitioner.numPartitions}" )
217- } else {
218- throw new IllegalArgumentException (" Unrecognized partitioner." )
219- }
220-
221130 /** Returns the dimensions of the matrix. */
222131 def getDim : (Long , Long ) = {
223-
224- val firstRowColumn = rdd.filter(block => block.blockRowIndex == 0 || block.blockColIndex == 0 ).
225- map { block =>
226- ((block.blockRowIndex, block.blockColIndex), (block.mat.numRows, block.mat.numCols))
132+ // picks the sizes of the matrix with the maximum indices
133+ def pickSizeByGreaterIndex (example : (Int , Int , Int , Int ), base : (Int , Int , Int , Int )): (Int , Int , Int , Int ) = {
134+ if (example._1 > base._1 && example._2 > base._2) {
135+ (example._1, example._2, example._3, example._4)
136+ } else if (example._1 > base._1) {
137+ (example._1, base._2, example._3, base._4)
138+ } else if (example._2 > base._2) {
139+ (base._1, example._2, base._3, example._4)
140+ } else {
141+ (base._1, base._2, base._3, base._4)
227142 }
143+ }
228144
229- firstRowColumn.treeAggregate((0L , 0L ))(
230- seqOp = (c, v) => (c, v) match { case ((x_dim, y_dim), ((indX, indY), (nRow, nCol))) =>
231- if (indX == 0 && indY == 0 ) {
232- (x_dim + nRow, y_dim + nCol)
233- } else if (indX == 0 ) {
234- (x_dim, y_dim + nCol)
235- } else {
236- (x_dim + nRow, y_dim)
237- }
145+ val lastRowCol = rdd.treeAggregate((0 , 0 , 0 , 0 ))(
146+ seqOp = (c, v) => (c, v) match { case (base, ((blockXInd, blockYInd), mat)) =>
147+ pickSizeByGreaterIndex((blockXInd, blockYInd, mat.numRows, mat.numCols), base)
238148 },
239149 combOp = (c1, c2) => (c1, c2) match {
240- case ((x_dim1, y_dim1), (x_dim2, y_dim2) ) =>
241- (x_dim1 + x_dim2, y_dim1 + y_dim2 )
150+ case (res1, res2 ) =>
151+ pickSizeByGreaterIndex(res1, res2 )
242152 })
153+
154+ (lastRowCol._1.toLong * partitioner.rowPerBlock + lastRowCol._3,
155+ lastRowCol._2.toLong * partitioner.colPerBlock + lastRowCol._4)
243156 }
244157
245158 /** Returns the Frobenius Norm of the matrix */
246159 def normFro (): Double = {
247- math.sqrt(rdd.map(lm => lm.mat.values.map(x => math.pow(x, 2 )).sum).reduce(_ + _))
160+ math.sqrt(rdd.map {
161+ case sparse : ((Int , Int ), SparseMatrix ) =>
162+ sparse._2.values.map(x => math.pow(x, 2 )).sum
163+ case dense : ((Int , Int ), DenseMatrix ) =>
164+ dense._2.values.map(x => math.pow(x, 2 )).sum
165+ }.reduce(_ + _))
248166 }
249167
250168 /** Cache the underlying RDD. */
251169 def cache (): DistributedMatrix = {
252- matrixRDD .cache()
170+ rdd .cache()
253171 this
254172 }
255173
256174 /** Set the storage level for the underlying RDD. */
257175 def persist (storageLevel : StorageLevel ): DistributedMatrix = {
258- matrixRDD.persist(storageLevel)
259- this
260- }
261-
262- /** Add a key to the underlying rdd for partitioning and joins. */
263- private def keyBy (part : BlockMatrixPartitioner = partitioner): RDD [(Int , SubMatrix )] = {
264- rdd.map { block =>
265- part match {
266- case r : RowBasedPartitioner => (block.blockRowIndex, block)
267- case c : ColumnBasedPartitioner => (block.blockColIndex, block)
268- case g : GridPartitioner => (block.blockRowIndex + numRowBlocks * block.blockColIndex, block)
269- case _ => throw new IllegalArgumentException (" Unrecognized partitioner" )
270- }
271- }
272- }
273-
274- /**
275- * Repartition the BlockMatrix using a different partitioner.
276- *
277- * @param part The partitioner to partition by
278- * @return The repartitioned BlockMatrix
279- */
280- def repartition (part : BlockMatrixPartitioner ): DistributedMatrix = {
281- matrixRDD = keyBy(part)
282- setPartitioner(part)
176+ rdd.persist(storageLevel)
283177 this
284178 }
285179
286180 /** Collect the distributed matrix on the driver. */
287- def collect (): DenseMatrix = {
288- val parts = rdd.map(x => ((x.blockRowIndex, x.blockColIndex), x.mat)).
289- collect().sortBy(x => (x._1._2, x._1._1))
181+ def toLocalMatrix (): Matrix = {
182+ val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1))
290183 val nRows = numRows().toInt
291184 val nCols = numCols().toInt
292185 val values = new Array [Double ](nRows * nCols)
@@ -301,7 +194,7 @@ class BlockMatrix(
301194 val indStart = (j + colStart) * nRows + rowStart
302195 val indEnd = block.numRows
303196 val matStart = j * block.numRows
304- val mat = block.values
197+ val mat = block.toArray
305198 while (i < indEnd) {
306199 values(indStart + i) = mat(matStart + i)
307200 i += 1
@@ -316,7 +209,7 @@ class BlockMatrix(
316209
317210 /** Collects data and assembles a local dense breeze matrix (for test only). */
318211 private [mllib] def toBreeze (): BDM [Double ] = {
319- val localMat = collect ()
320- new BDM [Double ](localMat.numRows, localMat.numCols, localMat.values )
212+ val localMat = toLocalMatrix ()
213+ new BDM [Double ](localMat.numRows, localMat.numCols, localMat.toArray )
321214 }
322215}
0 commit comments