Skip to content

Commit 0975019

Browse files
johnc1231srowen
authored andcommitted
[SPARK-20109][MLLIB] Rewrote toBlockMatrix method on IndexedRowMatrix
## What changes were proposed in this pull request? - ~~I added the method `toBlockMatrixDense` to the IndexedRowMatrix class. The current implementation of `toBlockMatrix` is insufficient for users with relatively dense IndexedRowMatrix objects, since it assumes sparsity.~~ EDIT: Ended up deciding that there should be just a single `toBlockMatrix` method, which creates a BlockMatrix whose blocks may be dense or sparse depending on the sparsity of the rows. This method will work better on any current use case of `toBlockMatrix` and doesn't go through `CoordinateMatrix` like the old method. ## How was this patch tested? ~~I used the same tests already written for `toBlockMatrix()` to test this method. I also added a new additional unit test for an edge case that was not adequately tested by current test suite.~~ I ran the original `IndexedRowMatrix` tests, plus wrote more to better handle edge cases ignored by original tests. Author: John Compitello <[email protected]> Closes #17459 from johnc1231/johnc-fix-ir-to-block.
1 parent 6d05c1c commit 0975019

File tree

3 files changed

+157
-11
lines changed

3 files changed

+157
-11
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/CoordinateMatrix.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,13 @@ class CoordinateMatrix @Since("1.0.0") (
125125
s"colsPerBlock needs to be greater than 0. colsPerBlock: $colsPerBlock")
126126
val m = numRows()
127127
val n = numCols()
128+
129+
// Since block matrices require an integer row and col index
130+
require(math.ceil(m.toDouble / rowsPerBlock) <= Int.MaxValue,
131+
"Number of rows divided by rowsPerBlock cannot exceed maximum integer.")
132+
require(math.ceil(n.toDouble / colsPerBlock) <= Int.MaxValue,
133+
"Number of cols divided by colsPerBlock cannot exceed maximum integer.")
134+
128135
val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt
129136
val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt
130137
val partitioner = GridPartitioner(numRowBlocks, numColBlocks, entries.partitions.length)

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,15 @@ class IndexedRowMatrix @Since("1.0.0") (
9191
}
9292

9393
/**
94-
* Converts to BlockMatrix. Creates blocks of `SparseMatrix` with size 1024 x 1024.
94+
* Converts to BlockMatrix. Creates blocks with size 1024 x 1024.
9595
*/
9696
@Since("1.3.0")
9797
def toBlockMatrix(): BlockMatrix = {
9898
toBlockMatrix(1024, 1024)
9999
}
100100

101101
/**
102-
* Converts to BlockMatrix. Creates blocks of `SparseMatrix`.
102+
* Converts to BlockMatrix. Blocks may be sparse or dense depending on the sparsity of the rows.
103103
* @param rowsPerBlock The number of rows of each block. The blocks at the bottom edge may have
104104
* a smaller value. Must be an integer value greater than 0.
105105
* @param colsPerBlock The number of columns of each block. The blocks at the right edge may have
@@ -108,8 +108,70 @@ class IndexedRowMatrix @Since("1.0.0") (
108108
*/
109109
@Since("1.3.0")
110110
def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
111-
// TODO: This implementation may be optimized
112-
toCoordinateMatrix().toBlockMatrix(rowsPerBlock, colsPerBlock)
111+
require(rowsPerBlock > 0,
112+
s"rowsPerBlock needs to be greater than 0. rowsPerBlock: $rowsPerBlock")
113+
require(colsPerBlock > 0,
114+
s"colsPerBlock needs to be greater than 0. colsPerBlock: $colsPerBlock")
115+
116+
val m = numRows()
117+
val n = numCols()
118+
119+
// Since block matrices require an integer row index
120+
require(math.ceil(m.toDouble / rowsPerBlock) <= Int.MaxValue,
121+
"Number of rows divided by rowsPerBlock cannot exceed maximum integer.")
122+
123+
// The remainder calculations only matter when m % rowsPerBlock != 0 or n % colsPerBlock != 0
124+
val remainderRowBlockIndex = m / rowsPerBlock
125+
val remainderColBlockIndex = n / colsPerBlock
126+
val remainderRowBlockSize = (m % rowsPerBlock).toInt
127+
val remainderColBlockSize = (n % colsPerBlock).toInt
128+
val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt
129+
val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt
130+
131+
val blocks = rows.flatMap { ir: IndexedRow =>
132+
val blockRow = ir.index / rowsPerBlock
133+
val rowInBlock = ir.index % rowsPerBlock
134+
135+
ir.vector match {
136+
case SparseVector(size, indices, values) =>
137+
indices.zip(values).map { case (index, value) =>
138+
val blockColumn = index / colsPerBlock
139+
val columnInBlock = index % colsPerBlock
140+
((blockRow.toInt, blockColumn.toInt), (rowInBlock.toInt, Array((value, columnInBlock))))
141+
}
142+
case DenseVector(values) =>
143+
values.grouped(colsPerBlock)
144+
.zipWithIndex
145+
.map { case (values, blockColumn) =>
146+
((blockRow.toInt, blockColumn), (rowInBlock.toInt, values.zipWithIndex))
147+
}
148+
}
149+
}.groupByKey(GridPartitioner(numRowBlocks, numColBlocks, rows.getNumPartitions)).map {
150+
case ((blockRow, blockColumn), itr) =>
151+
val actualNumRows =
152+
if (blockRow == remainderRowBlockIndex) remainderRowBlockSize else rowsPerBlock
153+
val actualNumColumns =
154+
if (blockColumn == remainderColBlockIndex) remainderColBlockSize else colsPerBlock
155+
156+
val arraySize = actualNumRows * actualNumColumns
157+
val matrixAsArray = new Array[Double](arraySize)
158+
var countForValues = 0
159+
itr.foreach { case (rowWithinBlock, valuesWithColumns) =>
160+
valuesWithColumns.foreach { case (value, columnWithinBlock) =>
161+
matrixAsArray.update(columnWithinBlock * actualNumRows + rowWithinBlock, value)
162+
countForValues += 1
163+
}
164+
}
165+
val denseMatrix = new DenseMatrix(actualNumRows, actualNumColumns, matrixAsArray)
166+
val finalMatrix = if (countForValues / arraySize.toDouble >= 0.1) {
167+
denseMatrix
168+
} else {
169+
denseMatrix.toSparse
170+
}
171+
172+
((blockRow, blockColumn), finalMatrix)
173+
}
174+
new BlockMatrix(blocks, rowsPerBlock, colsPerBlock, m, n)
113175
}
114176

115177
/**

mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrixSuite.scala

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.linalg.distributed
2020
import breeze.linalg.{diag => brzDiag, DenseMatrix => BDM, DenseVector => BDV}
2121

2222
import org.apache.spark.SparkFunSuite
23-
import org.apache.spark.mllib.linalg.{Matrices, Vectors}
23+
import org.apache.spark.mllib.linalg._
2424
import org.apache.spark.mllib.util.MLlibTestSparkContext
2525
import org.apache.spark.rdd.RDD
2626

@@ -87,19 +87,96 @@ class IndexedRowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
8787
assert(coordMat.toBreeze() === idxRowMat.toBreeze())
8888
}
8989

90-
test("toBlockMatrix") {
91-
val idxRowMat = new IndexedRowMatrix(indexedRows)
92-
val blockMat = idxRowMat.toBlockMatrix(2, 2)
90+
test("toBlockMatrix dense backing") {
91+
val idxRowMatDense = new IndexedRowMatrix(indexedRows)
92+
93+
// Tests when n % colsPerBlock != 0
94+
val blockMat = idxRowMatDense.toBlockMatrix(2, 2)
9395
assert(blockMat.numRows() === m)
9496
assert(blockMat.numCols() === n)
95-
assert(blockMat.toBreeze() === idxRowMat.toBreeze())
97+
assert(blockMat.toBreeze() === idxRowMatDense.toBreeze())
98+
99+
// Tests when m % rowsPerBlock != 0
100+
val blockMat2 = idxRowMatDense.toBlockMatrix(3, 1)
101+
assert(blockMat2.numRows() === m)
102+
assert(blockMat2.numCols() === n)
103+
assert(blockMat2.toBreeze() === idxRowMatDense.toBreeze())
96104

97105
intercept[IllegalArgumentException] {
98-
idxRowMat.toBlockMatrix(-1, 2)
106+
idxRowMatDense.toBlockMatrix(-1, 2)
99107
}
100108
intercept[IllegalArgumentException] {
101-
idxRowMat.toBlockMatrix(2, 0)
109+
idxRowMatDense.toBlockMatrix(2, 0)
102110
}
111+
112+
assert(blockMat.blocks.map { case (_, matrix: Matrix) =>
113+
matrix.isInstanceOf[DenseMatrix]
114+
}.reduce(_ && _))
115+
assert(blockMat2.blocks.map { case (_, matrix: Matrix) =>
116+
matrix.isInstanceOf[DenseMatrix]
117+
}.reduce(_ && _))
118+
}
119+
120+
test("toBlockMatrix sparse backing") {
121+
val sparseData = Seq(
122+
(15L, Vectors.sparse(12, Seq((0, 4.0))))
123+
).map(x => IndexedRow(x._1, x._2))
124+
125+
// Gonna make m and n larger here so the matrices can easily be completely sparse:
126+
val m = 16
127+
val n = 12
128+
129+
val idxRowMatSparse = new IndexedRowMatrix(sc.parallelize(sparseData))
130+
131+
// Tests when n % colsPerBlock != 0
132+
val blockMat = idxRowMatSparse.toBlockMatrix(8, 8)
133+
assert(blockMat.numRows() === m)
134+
assert(blockMat.numCols() === n)
135+
assert(blockMat.toBreeze() === idxRowMatSparse.toBreeze())
136+
137+
// Tests when m % rowsPerBlock != 0
138+
val blockMat2 = idxRowMatSparse.toBlockMatrix(6, 6)
139+
assert(blockMat2.numRows() === m)
140+
assert(blockMat2.numCols() === n)
141+
assert(blockMat2.toBreeze() === idxRowMatSparse.toBreeze())
142+
143+
assert(blockMat.blocks.collect().forall{ case (_, matrix: Matrix) =>
144+
matrix.isInstanceOf[SparseMatrix]
145+
})
146+
assert(blockMat2.blocks.collect().forall{ case (_, matrix: Matrix) =>
147+
matrix.isInstanceOf[SparseMatrix]
148+
})
149+
}
150+
151+
test("toBlockMatrix mixed backing") {
152+
val m = 24
153+
val n = 18
154+
155+
val mixedData = Seq(
156+
(0L, Vectors.dense((0 to 17).map(_.toDouble).toArray)),
157+
(1L, Vectors.dense((0 to 17).map(_.toDouble).toArray)),
158+
(23L, Vectors.sparse(18, Seq((0, 4.0)))))
159+
.map(x => IndexedRow(x._1, x._2))
160+
161+
val idxRowMatMixed = new IndexedRowMatrix(
162+
sc.parallelize(mixedData))
163+
164+
// Tests when n % colsPerBlock != 0
165+
val blockMat = idxRowMatMixed.toBlockMatrix(12, 12)
166+
assert(blockMat.numRows() === m)
167+
assert(blockMat.numCols() === n)
168+
assert(blockMat.toBreeze() === idxRowMatMixed.toBreeze())
169+
170+
// Tests when m % rowsPerBlock != 0
171+
val blockMat2 = idxRowMatMixed.toBlockMatrix(18, 6)
172+
assert(blockMat2.numRows() === m)
173+
assert(blockMat2.numCols() === n)
174+
assert(blockMat2.toBreeze() === idxRowMatMixed.toBreeze())
175+
176+
val blocks = blockMat.blocks.collect()
177+
178+
assert(blocks.forall { case((row, col), matrix) =>
179+
if (row == 0) matrix.isInstanceOf[DenseMatrix] else matrix.isInstanceOf[SparseMatrix]})
103180
}
104181

105182
test("multiply a local matrix") {

0 commit comments

Comments
 (0)