Skip to content

Commit 30363ed

Browse files
Reza Zadehmengxr
authored andcommitted
[MLlib] [SPARK-6713] Iterators in columnSimilarities for mapPartitionsWithIndex
Use Iterators in columnSimilarities to allow mapPartitionsWithIndex to spill to disk. This could happen in a dense and large column - this way Spark can spill the pairs onto disk instead of building all the pairs before handing them to Spark. Another PR coming to update documentation. Author: Reza Zadeh <[email protected]> Closes apache#5364 from rezazadeh/optmemsim and squashes the following commits: 47c90ba [Reza Zadeh] Iterators in columnSimilarities for flatMap
1 parent 9fe4125 commit 30363ed

File tree

1 file changed

+9
-10
lines changed
  • mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed

1 file changed

+9
-10
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,6 @@ class RowMatrix(
531531
val rand = new XORShiftRandom(indx)
532532
val scaled = new Array[Double](p.size)
533533
iter.flatMap { row =>
534-
val buf = new ListBuffer[((Int, Int), Double)]()
535534
row match {
536535
case SparseVector(size, indices, values) =>
537536
val nnz = indices.size
@@ -540,8 +539,9 @@ class RowMatrix(
540539
scaled(k) = values(k) / q(indices(k))
541540
k += 1
542541
}
543-
k = 0
544-
while (k < nnz) {
542+
543+
Iterator.tabulate (nnz) { k =>
544+
val buf = new ListBuffer[((Int, Int), Double)]()
545545
val i = indices(k)
546546
val iVal = scaled(k)
547547
if (iVal != 0 && rand.nextDouble() < p(i)) {
@@ -555,17 +555,17 @@ class RowMatrix(
555555
l += 1
556556
}
557557
}
558-
k += 1
559-
}
558+
buf
559+
}.flatten
560560
case DenseVector(values) =>
561561
val n = values.size
562562
var i = 0
563563
while (i < n) {
564564
scaled(i) = values(i) / q(i)
565565
i += 1
566566
}
567-
i = 0
568-
while (i < n) {
567+
Iterator.tabulate (n) { i =>
568+
val buf = new ListBuffer[((Int, Int), Double)]()
569569
val iVal = scaled(i)
570570
if (iVal != 0 && rand.nextDouble() < p(i)) {
571571
var j = i + 1
@@ -577,10 +577,9 @@ class RowMatrix(
577577
j += 1
578578
}
579579
}
580-
i += 1
581-
}
580+
buf
581+
}.flatten
582582
}
583-
buf
584583
}
585584
}.reduceByKey(_ + _).map { case ((i, j), sim) =>
586585
MatrixEntry(i.toLong, j.toLong, sim)

0 commit comments

Comments
 (0)