@@ -51,16 +51,16 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
5151 vec.toArray.view.zipWithIndex.map { case (v, j) =>
5252 ((j, v), uid)
5353 }
54- }.persist( StorageLevel . MEMORY_AND_DISK ) // used by sortByKey
54+ }
5555 // global sort by (columnIndex, value)
56- val sorted = colBased.sortByKey().persist( StorageLevel . MEMORY_AND_DISK ) // used by zipWithIndex
56+ val sorted = colBased.sortByKey()
5757 // Assign global ranks (using average ranks for tied values)
5858 val globalRanks = sorted.zipWithIndex().mapPartitions { iter =>
5959 var preCol = - 1
6060 var preVal = Double .NaN
6161 var startRank = - 1.0
6262 var cachedIds = ArrayBuffer .empty[Long ]
63- def flush () : Iterable [(Long , (Int , Double ))] = {
63+ def flush : () => Iterable [(Long , (Int , Double ))] = () => {
6464 val averageRank = startRank + (cachedIds.size - 1 ) / 2.0
6565 val output = cachedIds.map { i =>
6666 (i, (preCol, averageRank))
@@ -69,7 +69,8 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
6969 output
7070 }
7171 iter.flatMap { case (((j, v), uid), rank) =>
72- if (j != preCol || v != preVal) {
72+ // If we see a new value or cachedIds is too big, we flush ids with their average rank.
73+ if (j != preCol || v != preVal || cachedIds.size >= 10000000 ) {
7374 val output = flush()
7475 preCol = j
7576 preVal = v
0 commit comments