1717
1818package org .apache .spark .mllib .regression
1919
20- import org .apache .spark .mllib .linalg .Vector
21- import org .apache .spark .mllib .regression .MonotonicityConstraint .MonotonicityConstraint ._
20+ import org .apache .spark .api .java .{JavaRDD , JavaPairRDD }
2221import org .apache .spark .rdd .RDD
2322
24- /**
25- * Monotonicity constrains for monotone regression
26- * Isotonic (increasing)
27- * Antitonic (decreasing)
28- */
29- object MonotonicityConstraint {
30-
31- object MonotonicityConstraint {
32-
33- sealed trait MonotonicityConstraint {
34- private [regression] def holds (
35- current : WeightedLabeledPoint ,
36- next : WeightedLabeledPoint ): Boolean
37- }
38-
39- /**
40- * Isotonic monotonicity constraint. Increasing sequence
41- */
42- case object Isotonic extends MonotonicityConstraint {
43- override def holds (current : WeightedLabeledPoint , next : WeightedLabeledPoint ): Boolean = {
44- current.label <= next.label
45- }
46- }
47-
48- /**
49- * Antitonic monotonicity constrain. Decreasing sequence
50- */
51- case object Antitonic extends MonotonicityConstraint {
52- override def holds (current : WeightedLabeledPoint , next : WeightedLabeledPoint ): Boolean = {
53- current.label >= next.label
54- }
55- }
56- }
57-
58- val Isotonic = MonotonicityConstraint .Isotonic
59- val Antitonic = MonotonicityConstraint .Antitonic
60- }
61-
6223/**
6324 * Regression model for Isotonic regression
6425 *
6526 * @param predictions Weights computed for every feature.
66- * @param monotonicityConstraint specifies if the sequence is increasing or decreasing
27+ * @param isotonic isotonic ( increasing) or antitonic ( decreasing) sequence
6728 */
68- class IsotonicRegressionModel (
69- val predictions : Seq [WeightedLabeledPoint ],
70- val monotonicityConstraint : MonotonicityConstraint )
71- extends RegressionModel {
29+ class IsotonicRegressionModel (
30+ val predictions : Seq [( Double , Double , Double ) ],
31+ val isotonic : Boolean )
32+ extends Serializable {
7233
73- override def predict (testData : RDD [Vector ]): RDD [Double ] =
34+ /**
35+ * Predict labels for provided features
36+ *
37+ * @param testData features to be labeled
38+ * @return predicted labels
39+ */
40+ def predict (testData : RDD [Double ]): RDD [Double ] =
7441 testData.map(predict)
7542
76- override def predict (testData : Vector ): Double = {
43+ /**
44+ * Predict labels for provided features
45+ *
46+ * @param testData features to be labeled
47+ * @return predicted labels
48+ */
49+ def predict (testData : JavaRDD [java.lang.Double ]): RDD [java.lang.Double ] =
50+ testData.rdd.map(x => x.doubleValue()).map(predict)
51+
52+ /**
53+ * Predict a single label
54+ *
55+ * @param testData feature to be labeled
56+ * @return predicted label
57+ */
58+ def predict (testData : Double ): Double =
7759 // Take the highest of data points smaller than our feature or data point with lowest feature
78- (predictions.head +:
79- predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label
80- }
60+ (predictions.head +: predictions.filter(y => y._2 <= testData)).last._1
8161}
8262
8363/**
@@ -91,23 +71,23 @@ trait IsotonicRegressionAlgorithm
9171 *
9272 * @param predictions labels estimated using isotonic regression algorithm.
9373 * Used for predictions on new data points.
94- * @param monotonicityConstraint isotonic or antitonic
74+ * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
9575 * @return isotonic regression model
9676 */
9777 protected def createModel (
98- predictions : Seq [WeightedLabeledPoint ],
99- monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel
78+ predictions : Seq [( Double , Double , Double ) ],
79+ isotonic : Boolean ): IsotonicRegressionModel
10080
10181 /**
10282 * Run algorithm to obtain isotonic regression model
10383 *
104- * @param input data
105- * @param monotonicityConstraint ascending or descenting
84+ * @param input (label, feature, weight)
85+ * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
10686 * @return isotonic regression model
10787 */
10888 def run (
109- input : RDD [WeightedLabeledPoint ],
110- monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel
89+ input : RDD [( Double , Double , Double ) ],
90+ isotonic : Boolean ): IsotonicRegressionModel
11191}
11292
11393/**
@@ -117,17 +97,17 @@ class PoolAdjacentViolators private [mllib]
11797 extends IsotonicRegressionAlgorithm {
11898
11999 override def run (
120- input : RDD [WeightedLabeledPoint ],
121- monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel = {
100+ input : RDD [( Double , Double , Double ) ],
101+ isotonic : Boolean ): IsotonicRegressionModel = {
122102 createModel(
123- parallelPoolAdjacentViolators(input, monotonicityConstraint ),
124- monotonicityConstraint )
103+ parallelPoolAdjacentViolators(input, isotonic ),
104+ isotonic )
125105 }
126106
127107 override protected def createModel (
128- predictions : Seq [WeightedLabeledPoint ],
129- monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel = {
130- new IsotonicRegressionModel (predictions, monotonicityConstraint )
108+ predictions : Seq [( Double , Double , Double ) ],
109+ isotonic : Boolean ): IsotonicRegressionModel = {
110+ new IsotonicRegressionModel (predictions, isotonic )
131111 }
132112
133113 /**
@@ -138,32 +118,40 @@ class PoolAdjacentViolators private [mllib]
138118 * Method in situ mutates input array
139119 *
140120 * @param in input data
141- * @param monotonicityConstraint asc or desc
121+ * @param isotonic asc or desc
142122 * @return result
143123 */
144124 private def poolAdjacentViolators (
145- in : Array [WeightedLabeledPoint ],
146- monotonicityConstraint : MonotonicityConstraint ): Array [WeightedLabeledPoint ] = {
125+ in : Array [( Double , Double , Double ) ],
126+ isotonic : Boolean ): Array [( Double , Double , Double ) ] = {
147127
148128 // Pools sub array within given bounds assigning weighted average value to all elements
149- def pool (in : Array [WeightedLabeledPoint ], start : Int , end : Int ): Unit = {
129+ def pool (in : Array [( Double , Double , Double ) ], start : Int , end : Int ): Unit = {
150130 val poolSubArray = in.slice(start, end + 1 )
151131
152- val weightedSum = poolSubArray.map(lp => lp.label * lp.weight ).sum
153- val weight = poolSubArray.map(_.weight ).sum
132+ val weightedSum = poolSubArray.map(lp => lp._1 * lp._3 ).sum
133+ val weight = poolSubArray.map(_._3 ).sum
154134
155135 for (i <- start to end) {
156- in(i) = WeightedLabeledPoint (weightedSum / weight, in(i).features , in(i).weight )
136+ in(i) = (weightedSum / weight, in(i)._2 , in(i)._3 )
157137 }
158138 }
159139
140+ val isotonicConstraint : (Double , Double ) => Boolean = (x, y) => x <= y
141+ val antitonicConstraint : (Double , Double ) => Boolean = (x, y) => x >= y
142+
143+ def monotonicityConstraint (isotonic : Boolean ) =
144+ if (isotonic) isotonicConstraint else antitonicConstraint
145+
146+ val monotonicityConstraintHolds = monotonicityConstraint(isotonic)
147+
160148 var i = 0
161149
162150 while (i < in.length) {
163151 var j = i
164152
165153 // Find monotonicity violating sequence, if any
166- while (j < in.length - 1 && ! monotonicityConstraint.holds (in(j), in(j + 1 ))) {
154+ while (j < in.length - 1 && ! monotonicityConstraintHolds (in(j)._1 , in(j + 1 )._1 )) {
167155 j = j + 1
168156 }
169157
@@ -173,7 +161,7 @@ class PoolAdjacentViolators private [mllib]
173161 } else {
174162 // Otherwise pool the violating sequence
175163 // And check if pooling caused monotonicity violation in previously processed points
176- while (i >= 0 && ! monotonicityConstraint.holds (in(i), in(i + 1 ))) {
164+ while (i >= 0 && ! monotonicityConstraintHolds (in(i)._1 , in(i + 1 )._1 )) {
177165 pool(in, i, j)
178166 i = i - 1
179167 }
@@ -190,19 +178,19 @@ class PoolAdjacentViolators private [mllib]
190178 * Calls Pool adjacent violators on each partition and then again on the result
191179 *
192180 * @param testData input
193- * @param monotonicityConstraint asc or desc
181+ * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
194182 * @return result
195183 */
196184 private def parallelPoolAdjacentViolators (
197- testData : RDD [WeightedLabeledPoint ],
198- monotonicityConstraint : MonotonicityConstraint ): Seq [WeightedLabeledPoint ] = {
185+ testData : RDD [( Double , Double , Double ) ],
186+ isotonic : Boolean ): Seq [( Double , Double , Double ) ] = {
199187
200188 poolAdjacentViolators(
201189 testData
202- .sortBy(_.features.toArray.head )
190+ .sortBy(_._2 )
203191 .cache()
204- .mapPartitions(it => poolAdjacentViolators(it.toArray, monotonicityConstraint ).toIterator)
205- .collect(), monotonicityConstraint )
192+ .mapPartitions(it => poolAdjacentViolators(it.toArray, isotonic ).toIterator)
193+ .collect(), isotonic )
206194 }
207195}
208196
@@ -212,20 +200,35 @@ class PoolAdjacentViolators private [mllib]
212200object IsotonicRegression {
213201
214202 /**
215- * Train a monotone regression model given an RDD of (label, features, weight).
216- * Currently only one dimensional algorithm is supported (features.length is one)
203+ * Train a monotone regression model given an RDD of (label, feature, weight).
217204 * Label is the dependent y value
218205 * Weight of the data point is the number of measurements. Default is 1
219206 *
220- * @param input RDD of (label, array of features , weight).
207+ * @param input RDD of (label, feature , weight).
221208 * Each point describes a row of the data
222209 * matrix A as well as the corresponding right hand side label y
223210 * and weight as number of measurements
224- * @param monotonicityConstraint Isotonic (increasing) or Antitonic (decreasing) sequence
211+ * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
212+ */
213+ def train (
214+ input : RDD [(Double , Double , Double )],
215+ isotonic : Boolean = true ): IsotonicRegressionModel = {
216+ new PoolAdjacentViolators ().run(input, isotonic)
217+ }
218+
219+ /**
220+ * Train a monotone regression model given an RDD of (label, feature).
221+ * Label is the dependent y value
222+ * Weight defaults to 1
223+ *
224+ * @param input RDD of (label, feature).
225+ * @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
226+ * @return
225227 */
226228 def train (
227- input : RDD [WeightedLabeledPoint ],
228- monotonicityConstraint : MonotonicityConstraint = Isotonic ): IsotonicRegressionModel = {
229- new PoolAdjacentViolators ().run(input, monotonicityConstraint)
229+ input : JavaPairRDD [java.lang.Double , java.lang.Double ],
230+ isotonic : Boolean ): IsotonicRegressionModel = {
231+ new PoolAdjacentViolators ()
232+ .run(input.rdd.map(x => (x._1.doubleValue(), x._2.doubleValue(), 1d )), isotonic)
230233 }
231234}
0 commit comments