@@ -19,10 +19,10 @@ package org.apache.spark.mllib.stat.correlation
1919
2020import scala .collection .mutable .ArrayBuffer
2121
22- import org .apache .spark .{ Logging , HashPartitioner }
22+ import org .apache .spark .Logging
2323import org .apache .spark .SparkContext ._
24- import org .apache .spark .mllib .linalg .{DenseVector , Matrix , Vector }
25- import org .apache .spark .rdd .{ CoGroupedRDD , RDD }
24+ import org .apache .spark .mllib .linalg .{Matrix , Vector , Vectors }
25+ import org .apache .spark .rdd .RDD
2626
2727/**
2828 * Compute Spearman's correlation for two RDDs of the type RDD[Double] or the correlation matrix
@@ -43,87 +43,51 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
4343 /**
4444 * Compute Spearman's correlation matrix S, for the input matrix, where S(i, j) is the
4545 * correlation between column i and j.
46- *
47- * Input RDD[Vector] should be cached or checkpointed if possible since it would be split into
48- * numCol RDD[Double]s, each of which sorted, and the joined back into a single RDD[Vector].
4946 */
5047 override def computeCorrelationMatrix (X : RDD [Vector ]): Matrix = {
51- val indexed = X .zipWithUniqueId()
52-
53- val numCols = X .first.size
54- if (numCols > 50 ) {
55- logWarning(" Computing the Spearman correlation matrix can be slow for large RDDs with more"
56- + " than 50 columns." )
57- }
58- val ranks = new Array [RDD [(Long , Double )]](numCols)
59-
60- // Note: we use a for loop here instead of a while loop with a single index variable
61- // to avoid race condition caused by closure serialization
62- for (k <- 0 until numCols) {
63- val column = indexed.map { case (vector, index) => (vector(k), index) }
64- ranks(k) = getRanks(column)
48+ // ((columnIndex, value), rowUid)
49+ val colBased = X .zipWithUniqueId().flatMap { case (vec, uid) =>
50+ vec.toArray.view.zipWithIndex.map { case (v, j) =>
51+ ((j, v), uid)
52+ }
6553 }
66-
67- val ranksMat : RDD [Vector ] = makeRankMatrix(ranks, X )
68- PearsonCorrelation .computeCorrelationMatrix(ranksMat)
69- }
70-
71- /**
72- * Compute the ranks for elements in the input RDD, using the average method for ties.
73- *
74- * With the average method, elements with the same value receive the same rank that's computed
75- * by taking the average of their positions in the sorted list.
76- * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
77- * Note that positions here are 0-indexed, instead of the 1-indexed as in the definition for
78- * ranks in the standard definition for Spearman's correlation. This does not affect the final
79- * results and is slightly more performant.
80- *
81- * @param indexed RDD[(Double, Long)] containing pairs of the format (originalValue, uniqueId)
82- * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, rank), where uniqueId is
83- * copied from the input RDD.
84- */
85- private def getRanks (indexed : RDD [(Double , Long )]): RDD [(Long , Double )] = {
86- // Get elements' positions in the sorted list for computing average rank for duplicate values
87- val sorted = indexed.sortByKey().zipWithIndex()
88-
89- val ranks : RDD [(Long , Double )] = sorted.mapPartitions { iter =>
90- // add an extra element to signify the end of the list so that flatMap can flush the last
91- // batch of duplicates
92- val end = - 1L
93- val padded = iter ++ Iterator [((Double , Long ), Long )](((Double .NaN , end), end))
94- val firstEntry = padded.next()
95- var lastVal = firstEntry._1._1
96- var firstRank = firstEntry._2.toDouble
97- val idBuffer = ArrayBuffer (firstEntry._1._2)
98- padded.flatMap { case ((v, id), rank) =>
99- if (v == lastVal && id != end) {
100- idBuffer += id
101- Iterator .empty
102- } else {
103- val entries = if (idBuffer.size == 1 ) {
104- Iterator ((idBuffer(0 ), firstRank))
105- } else {
106- val averageRank = firstRank + (idBuffer.size - 1.0 ) / 2.0
107- idBuffer.map(id => (id, averageRank))
108- }
109- lastVal = v
110- firstRank = rank
111- idBuffer.clear()
112- idBuffer += id
113- entries
54+ // global sort by (columnIndex, value)
55+ val sorted = colBased.sortByKey()
56+ // assign global ranks (using average ranks for tied values)
57+ val globalRanks = sorted.zipWithIndex().mapPartitions { iter =>
58+ var preCol = - 1
59+ var preVal = Double .NaN
60+ var startRank = - 1.0
61+ var cachedUids = ArrayBuffer .empty[Long ]
62+ val flush : () => Iterable [(Long , (Int , Double ))] = () => {
63+ val averageRank = startRank + (cachedUids.size - 1 ) / 2.0
64+ val output = cachedUids.map { uid =>
65+ (uid, (preCol, averageRank))
11466 }
67+ cachedUids.clear()
68+ output
11569 }
70+ iter.flatMap { case (((j, v), uid), rank) =>
71+ // If we see a new value or cachedUids is too big, we flush ids with their average rank.
72+ if (j != preCol || v != preVal || cachedUids.size >= 10000000 ) {
73+ val output = flush()
74+ preCol = j
75+ preVal = v
76+ startRank = rank
77+ cachedUids += uid
78+ output
79+ } else {
80+ cachedUids += uid
81+ Iterator .empty
82+ }
83+ } ++ flush()
11684 }
117- ranks
118- }
119-
120- private def makeRankMatrix (ranks : Array [RDD [(Long , Double )]], input : RDD [Vector ]): RDD [Vector ] = {
121- val partitioner = new HashPartitioner (input.partitions.size)
122- val cogrouped = new CoGroupedRDD [Long ](ranks, partitioner)
123- cogrouped.map {
124- case (_, values : Array [Iterable [_]]) =>
125- val doubles = values.asInstanceOf [Array [Iterable [Double ]]]
126- new DenseVector (doubles.flatten.toArray)
85+ // Replace values in the input matrix by their ranks compared with values in the same column.
86+ // Note that shifting all ranks in a column by a constant value doesn't affect result.
87+ val groupedRanks = globalRanks.groupByKey().map { case (uid, iter) =>
88+ // sort by column index and then convert values to a vector
89+ Vectors .dense(iter.toSeq.sortBy(_._1).map(_._2).toArray)
12790 }
91+ PearsonCorrelation .computeCorrelationMatrix(groupedRanks)
12892 }
12993}
0 commit comments