@@ -24,88 +24,142 @@ import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
2424import org .apache .spark .rdd .RDD
2525
2626/**
27- * Regression model for Isotonic regression
27+ * Regression model for isotonic regression.
2828 *
29- * @param features Array of features .
30- * @param labels Array of labels associated to the features at the same index.
29+ * @param boundaries Array of boundaries for which predictions are known .
30+ * @param predictions Array of predictions associated to the boundaries at the same index.
3131 */
3232class IsotonicRegressionModel (
33- features : Array [Double ],
34- val labels : Array [Double ])
33+ boundaries : Array [Double ],
34+ val predictions : Array [Double ])
3535 extends Serializable {
3636
37+ private def isSorted (xs : Array [Double ]): Boolean = {
38+ var i = 1
39+ while (i < xs.length) {
40+ if (xs(i) < xs(i - 1 )) false
41+ i += 1
42+ }
43+ true
44+ }
45+
46+ assert(isSorted(boundaries))
47+ assert(boundaries.length == predictions.length)
48+
3749 /**
38- * Predict labels for provided features
39- * Using a piecewise constant function
50+ * Predict labels for provided features.
51+ * Using a piecewise linear function.
4052 *
41- * @param testData features to be labeled
42- * @return predicted labels
53+ * @param testData Features to be labeled.
54+ * @return Predicted labels.
4355 */
4456 def predict (testData : RDD [Double ]): RDD [Double ] =
4557 testData.map(predict)
4658
4759 /**
48- * Predict labels for provided features
49- * Using a piecewise constant function
60+ * Predict labels for provided features.
61+ * Using a piecewise linear function.
5062 *
51- * @param testData features to be labeled
52- * @return predicted labels
63+ * @param testData Features to be labeled.
64+ * @return Predicted labels.
5365 */
54- def predict (testData : JavaRDD [java.lang. Double ] ): JavaDoubleRDD =
66+ def predict (testData : JavaDoubleRDD ): JavaDoubleRDD =
5567 JavaDoubleRDD .fromRDD(predict(testData.rdd.asInstanceOf [RDD [Double ]]))
5668
5769 /**
58- * Predict a single label
59- * Using a piecewise constant function
70+ * Predict a single label.
71+ * Using a piecewise linear function.
6072 *
61- * @param testData feature to be labeled
62- * @return predicted label
73+ * @param testData Feature to be labeled.
74+ * @return Predicted label.
75+ * If testData exactly matches a boundary then associated prediction is directly returned
76+ * If testData is lower or higher than all boundaries
77+ * then first or last prediction is returned respectively
78+ * If testData falls between two values in boundary then predictions is treated as piecewise
79+ * linear function and interpolated value is returned
6380 */
6481 def predict (testData : Double ): Double = {
65- val result = binarySearch(features, testData)
6682
67- val index =
68- if (result == - 1 ) {
69- 0
70- } else if (result < 0 ) {
71- - result - 2
72- } else {
73- result
74- }
83+ def linearInterpolation (x1 : Double , y1 : Double , x2 : Double , y2 : Double , x : Double ): Double = {
84+ y1 + (y2 - y1) * (x - x1) / (x2 - x1)
85+ }
7586
76- labels(index)
87+ val insertIndex = binarySearch(boundaries, testData)
88+
89+ val normalisedInsertIndex = - insertIndex - 1
90+
91+ // Find if the index was lower than all values,
92+ // higher than all values, inbetween two values or exact match.
93+ if (insertIndex == - 1 ) {
94+ predictions.head
95+ } else if (normalisedInsertIndex == boundaries.length){
96+ predictions.last
97+ } else if (insertIndex < 0 ) {
98+ linearInterpolation(
99+ boundaries(normalisedInsertIndex - 1 ),
100+ predictions(normalisedInsertIndex - 1 ),
101+ boundaries(normalisedInsertIndex),
102+ predictions(normalisedInsertIndex),
103+ testData)
104+ } else {
105+ predictions(insertIndex)
106+ }
77107 }
78108}
79109
80110/**
81- * Isotonic regression
82- * Currently implemented using parallel pool adjacent violators algorithm
111+ * Isotonic regression.
112+ * Currently implemented using parallelized pool adjacent violators algorithm.
113+ * Currently only univariate (single feature) algorithm supported.
114+ *
115+ * Sequential PAV implementation based on:
116+ * Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
117+ * "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
118+ *
119+ * Sequential PAV parallelized as per:
120+ * Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
121+ * "An approach to parallelizing isotonic regression."
122+ * Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147.
83123 */
84- class IsotonicRegression
85- extends Serializable {
124+ class IsotonicRegression extends Serializable {
86125
87126 /**
88- * Run algorithm to obtain isotonic regression model
127+ * Run pool adjacent violators algorithm to obtain isotonic regression model.
128+ *
129+ * @param input RDD of tuples (label, feature, weight) where label is dependent variable
130+ * for which we calculate isotonic regression, feature is independent variable
131+ * and weight represents number of measures with default 1.
89132 *
90- * @param input (label, feature, weight)
91- * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
92- * @return isotonic regression model
133+ * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
134+ * @return Isotonic regression model.
93135 */
94136 def run (
95137 input : RDD [(Double , Double , Double )],
96- isotonic : Boolean = true ): IsotonicRegressionModel = {
97- createModel(
98- parallelPoolAdjacentViolators(input, isotonic),
99- isotonic)
100- }
138+ isotonic : Boolean ): IsotonicRegressionModel =
139+ createModel(parallelPoolAdjacentViolators(input, isotonic), isotonic)
140+
141+ /**
142+ * Run pool adjacent violators algorithm to obtain isotonic regression model.
143+ *
144+ * @param input JavaRDD of tuples (label, feature, weight) where label is dependent variable
145+ * for which we calculate isotonic regression, feature is independent variable
146+ * and weight represents number of measures with default 1.
147+ *
148+ * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
149+ * @return Isotonic regression model.
150+ */
151+ def run (
152+ input : JavaRDD [(java.lang.Double , java.lang.Double , java.lang.Double )],
153+ isotonic : Boolean ): IsotonicRegressionModel =
154+ run(input.rdd.asInstanceOf [RDD [(Double , Double , Double )]], isotonic)
101155
102156 /**
103- * Creates isotonic regression model with given parameters
157+ * Creates isotonic regression model with given parameters.
104158 *
105- * @param predictions labels estimated using isotonic regression algorithm.
159+ * @param predictions Predictions calculated using pool adjacent violators algorithm.
106160 * Used for predictions on new data points.
107- * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
108- * @return isotonic regression model
161+ * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
162+ * @return Isotonic regression model.
109163 */
110164 protected def createModel (
111165 predictions : Array [(Double , Double , Double )],
@@ -118,30 +172,30 @@ class IsotonicRegression
118172 }
119173
120174 /**
121- * Performs a pool adjacent violators algorithm (PAVA)
175+ * Performs a pool adjacent violators algorithm (PAV).
122176 * Uses approach with single processing of data where violators
123177 * in previously processed data created by pooling are fixed immediatelly.
124- * Uses optimization of discovering monotonicity violating sequences (blocks)
125- * Method in situ mutates input array
178+ * Uses optimization of discovering monotonicity violating sequences (blocks).
126179 *
127- * @param in input data
128- * @param isotonic asc or desc
129- * @return result
180+ * @param input Input data of tuples (label, feature, weight).
181+ * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
182+ * @return Result tuples (label, feature, weight) where labels were updated
183+ * to form a monotone sequence as per isotonic regression definition.
130184 */
131185 private def poolAdjacentViolators (
132- in : Array [(Double , Double , Double )],
186+ input : Array [(Double , Double , Double )],
133187 isotonic : Boolean ): Array [(Double , Double , Double )] = {
134188
135- // Pools sub array within given bounds assigning weighted average value to all elements
136- def pool (in : Array [(Double , Double , Double )], start : Int , end : Int ): Unit = {
137- val poolSubArray = in .slice(start, end + 1 )
189+ // Pools sub array within given bounds assigning weighted average value to all elements.
190+ def pool (input : Array [(Double , Double , Double )], start : Int , end : Int ): Unit = {
191+ val poolSubArray = input .slice(start, end + 1 )
138192
139193 val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
140194 val weight = poolSubArray.map(_._3).sum
141195
142196 var i = start
143197 while (i <= end) {
144- in (i) = (weightedSum / weight, in (i)._2, in (i)._3)
198+ input (i) = (weightedSum / weight, input (i)._2, input (i)._3)
145199 i = i + 1
146200 }
147201 }
@@ -150,39 +204,40 @@ class IsotonicRegression
150204 (x, y) => if (isotonic) x <= y else x >= y
151205
152206 var i = 0
153- while (i < in .length) {
207+ while (i < input .length) {
154208 var j = i
155209
156- // Find monotonicity violating sequence, if any
157- while (j < in .length - 1 && ! monotonicityConstraintHolds(in (j)._1, in (j + 1 )._1)) {
210+ // Find monotonicity violating sequence, if any.
211+ while (j < input .length - 1 && ! monotonicityConstraintHolds(input (j)._1, input (j + 1 )._1)) {
158212 j = j + 1
159213 }
160214
161- // If monotonicity was not violated, move to next data point
215+ // If monotonicity was not violated, move to next data point.
162216 if (i == j) {
163217 i = i + 1
164218 } else {
165219 // Otherwise pool the violating sequence
166- // And check if pooling caused monotonicity violation in previously processed points
167- while (i >= 0 && ! monotonicityConstraintHolds(in (i)._1, in (i + 1 )._1)) {
168- pool(in , i, j)
220+ // and check if pooling caused monotonicity violation in previously processed points.
221+ while (i >= 0 && ! monotonicityConstraintHolds(input (i)._1, input (i + 1 )._1)) {
222+ pool(input , i, j)
169223 i = i - 1
170224 }
171225
172226 i = j
173227 }
174228 }
175229
176- in
230+ input
177231 }
178232
179233 /**
180- * Performs parallel pool adjacent violators algorithm
181- * Calls Pool adjacent violators on each partition and then again on the result
234+ * Performs parallel pool adjacent violators algorithm.
235+ * Performs Pool adjacent violators algorithm on each partition and then again on the result.
182236 *
183- * @param testData input
184- * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
185- * @return result
237+ * @param testData Input data of tuples (label, feature, weight).
238+ * @param isotonic Isotonic (increasing) or antitonic (decreasing) sequence.
239+ * @return Result tuples (label, feature, weight) where labels were updated
240+ * to form a monotone sequence as per isotonic regression definition.
186241 */
187242 private def parallelPoolAdjacentViolators (
188243 testData : RDD [(Double , Double , Double )],
@@ -194,45 +249,4 @@ class IsotonicRegression
194249
195250 poolAdjacentViolators(parallelStepResult.collect(), isotonic)
196251 }
197- }
198-
199- /**
200- * Top-level methods for monotone regression (either isotonic or antitonic).
201- */
202- object IsotonicRegression {
203-
204- /**
205- * Train a monotone regression model given an RDD of (label, feature, weight).
206- * Label is the dependent y value
207- * Weight of the data point is the number of measurements. Default is 1
208- *
209- * @param input RDD of (label, feature, weight).
210- * Each point describes a row of the data
211- * matrix A as well as the corresponding right hand side label y
212- * and weight as number of measurements
213- * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
214- */
215- def train (
216- input : RDD [(Double , Double , Double )],
217- isotonic : Boolean = true ): IsotonicRegressionModel = {
218- new IsotonicRegression ().run(input, isotonic)
219- }
220-
221- /**
222- * Train a monotone regression model given an RDD of (label, feature, weight).
223- * Label is the dependent y value
224- * Weight of the data point is the number of measurements. Default is 1
225- *
226- * @param input RDD of (label, feature, weight).
227- * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
228- * @return
229- */
230- def train (
231- input : JavaRDD [(java.lang.Double , java.lang.Double , java.lang.Double )],
232- isotonic : Boolean ): IsotonicRegressionModel = {
233- new IsotonicRegression ()
234- .run(
235- input.rdd.asInstanceOf [RDD [(Double , Double , Double )]],
236- isotonic)
237- }
238- }
252+ }
0 commit comments