Skip to content

Commit bfade12

Browse files
committed
Added lots of classes for new ML API:
Abstract classes for learning algorithms: * Classifier * Regressor * Predictor Traits for learning algorithms * HasDefaultEstimator * IterativeEstimator * IterativeSolver * ProbabilisticClassificationModel * WeakLearner Concrete classes: learning algorithms * AdaBoost (partly implemented) * NaiveBayes (rough implementation) * LinearRegression * LogisticRegression (updated to use new abstract classes) Concrete classes: evaluation * ClassificationEvaluator * RegressionEvaluator * PredictionEvaluator Concrete classes: other * LabeledPoint (adding weight to the old LabeledPoint)
1 parent a83936e commit bfade12

19 files changed

+1001
-58
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.apache.spark.ml
2+
3+
import org.apache.spark.mllib.linalg.Vector
4+
5+
/**
6+
* Class that represents an instance (data point) for prediction tasks.
7+
*
8+
* @param label Label to predict
9+
* @param features List of features describing this instance
10+
* @param weight Instance weight
11+
*/
12+
case class LabeledPoint(label: Double, features: Vector, weight: Double) {
13+
14+
/** Default constructor which sets instance weight to 1.0 */
15+
def this(label: Double, features: Vector) = this(label, features, 1.0)
16+
17+
override def toString: String = {
18+
"(%s,%s,%s)".format(label, features, weight)
19+
}
20+
}
21+
22+
object LabeledPoint {
23+
def apply(label: Double, features: Vector) = new LabeledPoint(label, features)
24+
}
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
package org.apache.spark.ml.classification
2+
3+
import scala.collection.mutable.ArrayBuffer
4+
5+
import org.apache.spark.rdd.RDD
6+
import org.apache.spark.sql._
7+
import org.apache.spark.mllib.linalg.{Vectors, Vector}
8+
import org.apache.spark.ml.LabeledPoint
9+
import org.apache.spark.ml.evaluation.ClassificationEvaluator
10+
import org.apache.spark.ml.param.{HasWeightCol, Param, ParamMap, HasMaxIter}
11+
import org.apache.spark.ml.impl.estimator.{ProbabilisticClassificationModel, WeakLearner,
12+
IterativeEstimator, IterativeSolver}
13+
14+
15+
private[classification] trait AdaBoostParams extends ClassifierParams
16+
with HasMaxIter with HasWeightCol {
17+
18+
/** param for weak learner type */
19+
val weakLearner: Param[Classifier[_, _]] =
20+
new Param(this, "weakLearner", "weak learning algorithm")
21+
def getWeakLearner: Classifier[_, _] = get(weakLearner)
22+
23+
/** param for weak learner param maps */
24+
val weakLearnerParamMap: Param[ParamMap] =
25+
new Param(this, "weakLearnerParamMap", "param map for the weak learner")
26+
def getWeakLearnerParamMap: ParamMap = get(weakLearnerParamMap)
27+
28+
override def validate(paramMap: ParamMap): Unit = {
29+
// TODO: Check maxIter, weakLearner, weakLearnerParamMap, weightCol
30+
// Check: If the weak learner does not extend WeakLearner, then featuresColName should be
31+
// castable to FeaturesType.
32+
}
33+
}
34+
35+
36+
/**
37+
* AdaBoost
38+
*
39+
* Developer notes:
40+
* - If the weak learner implements the [[WeakLearner]]
41+
*/
42+
class AdaBoost extends Classifier[AdaBoost, AdaBoostModel]
43+
with AdaBoostParams
44+
with IterativeEstimator[AdaBoostModel] {
45+
46+
def setMaxIter(value: Int): this.type = set(maxIter, value)
47+
def setWeightCol(value: String): this.type = set(weightCol, value)
48+
def setWeakLearner(value: Classifier[_, _]): this.type = set(weakLearner, value)
49+
def setWeakLearnerParamMap(value: ParamMap): this.type = set(weakLearnerParamMap, value)
50+
51+
/**
52+
* Extract LabeledPoints, using the weak learner's native feature representation if possible.
53+
* @param paramMap Complete paramMap (after combining with the internal paramMap)
54+
*/
55+
private def extractLabeledPoints(dataset: SchemaRDD, paramMap: ParamMap): RDD[LabeledPoint] = {
56+
import dataset.sqlContext._
57+
val featuresColName = paramMap(featuresCol)
58+
val wl = paramMap(weakLearner)
59+
val featuresRDD: RDD[Vector] = wl match {
60+
case wlTagged: WeakLearner =>
61+
val wlParamMap = paramMap(weakLearnerParamMap)
62+
val wlFeaturesColName = wlParamMap(wl.featuresCol)
63+
val origFeaturesRDD = dataset.select(featuresColName.attr).as(wlFeaturesColName.attr)
64+
wlTagged.getNativeFeatureRDD(origFeaturesRDD, wlParamMap)
65+
case _ =>
66+
dataset.select(featuresColName.attr).map { case Row(features: Vector) => features }
67+
}
68+
69+
val labelColName = paramMap(labelCol)
70+
if (paramMap.contains(weightCol)) {
71+
val weightColName = paramMap(weightCol)
72+
dataset.select(labelColName.attr, weightColName.attr)
73+
.zip(featuresRDD).map { case (Row(label: Double, weight: Double), features: Vector) =>
74+
LabeledPoint(label, features, weight)
75+
}
76+
} else {
77+
dataset.select(labelColName.attr)
78+
.zip(featuresRDD).map { case (Row(label: Double), features: Vector) =>
79+
LabeledPoint(label, features)
80+
}
81+
}
82+
}
83+
84+
// From Classifier
85+
override def fit(dataset: SchemaRDD, paramMap: ParamMap): AdaBoostModel = {
86+
val map = this.paramMap ++ paramMap
87+
val labeledPoints: RDD[LabeledPoint] = extractLabeledPoints(dataset, map)
88+
train(labeledPoints, paramMap)
89+
}
90+
91+
// From IterativeEstimator
92+
override private[ml] def createSolver(dataset: SchemaRDD, paramMap: ParamMap): AdaBoostSolver = {
93+
val map = this.paramMap ++ paramMap
94+
val labeledPoints: RDD[LabeledPoint] = extractLabeledPoints(dataset, map)
95+
new AdaBoostSolver(labeledPoints, this, map)
96+
}
97+
98+
// From Predictor
99+
override def train(dataset: RDD[LabeledPoint], paramMap: ParamMap): AdaBoostModel = {
100+
val map = this.paramMap ++ paramMap
101+
val solver = new AdaBoostSolver(dataset, this, map)
102+
while (solver.step()) { }
103+
solver.currentModel
104+
}
105+
}
106+
107+
108+
class AdaBoostModel private[ml] (
109+
val weakHypotheses: Array[ClassificationModel[_]],
110+
val weakHypothesisWeights: Array[Double],
111+
override val parent: AdaBoost,
112+
override val fittingParamMap: ParamMap)
113+
extends ClassificationModel[AdaBoostModel]
114+
with ProbabilisticClassificationModel
115+
with AdaBoostParams {
116+
117+
require(weakHypotheses.size != 0)
118+
require(weakHypotheses.size == weakHypothesisWeights.size)
119+
120+
// From Classifier.Model:
121+
override val numClasses: Int = weakHypotheses(0).numClasses
122+
123+
require(weakHypotheses.forall(_.numClasses == numClasses))
124+
125+
private val margin: Vector => Double = (features) => {
126+
weakHypotheses.zip(weakHypothesisWeights)
127+
.foldLeft(0.0) { case (total: Double, (wh: ClassificationModel[_], weight: Double)) =>
128+
val pred = if (wh.predict(features) == 1.0) 1.0 else -1.0
129+
total + weight * pred
130+
}
131+
}
132+
133+
private val score: Vector => Double = (features) => {
134+
val m = margin(features)
135+
1.0 / (1.0 + math.exp(-2.0 * m))
136+
}
137+
138+
override def predictProbabilities(features: Vector): Vector = {
139+
val s = score(features)
140+
Vectors.dense(Array(1.0 - s, s))
141+
}
142+
143+
override def predictRaw(features: Vector): Vector = {
144+
val m = margin(features)
145+
Vectors.dense(Array(-m, m))
146+
}
147+
}
148+
149+
150+
private[ml] class AdaBoostSolver(
151+
val origData: RDD[LabeledPoint],
152+
val parent: AdaBoost,
153+
val paramMap: ParamMap) extends IterativeSolver[AdaBoostModel] {
154+
155+
private val weakHypotheses = new ArrayBuffer[ClassificationModel[_]]
156+
private val weakHypothesisWeights = new ArrayBuffer[Double]
157+
158+
private val wl: Classifier[_, _] = paramMap(parent.weakLearner)
159+
private val wlParamMap = paramMap(parent.weakLearnerParamMap)
160+
override val maxIterations: Int = paramMap(parent.maxIter)
161+
162+
// TODO: Decide if this alg should cache data, or if that should be left to the user.
163+
164+
// TODO: check for weights = 0
165+
// TODO: EDITING HERE NOW: switch to log weights
166+
private var logInstanceWeights: RDD[Double] = origData.map(lp => math.log(lp.weight))
167+
168+
override def stepImpl(): Boolean = ??? /*{
169+
// Check if the weak learner takes instance weights.
170+
val wlDataset = wl match {
171+
case wlWeighted: HasWeightCol =>
172+
origData.zip(logInstanceWeights).map { case (lp: LabeledPoint, logWeight: Double) =>
173+
LabeledPoint(lp.label, lp.features, weight)
174+
}
175+
case _ =>
176+
// Subsample data to simulate the current instance weight distribution.
177+
// TODO: This needs to be done before AdaBoost is committed.
178+
throw new NotImplementedError(
179+
"AdaBoost currently requires that the weak learning algorithm accept instance weights.")
180+
}
181+
// Train the weak learning algorithm.
182+
val weakHypothesis: ClassificationModel[_] = wl match {
183+
case wlTagged: WeakLearner[_] =>
184+
// This lets the weak learner know that the features are in its native format.
185+
wlTagged.trainNative(wlDataset, wlParamMap).asInstanceOf[ClassificationModel[_]]
186+
case _ =>
187+
wl.train(wlDataset, wlParamMap).asInstanceOf[ClassificationModel[_]]
188+
}
189+
// Add the weighted weak hypothesis to the ensemble.
190+
// TODO: Handle instance weights.
191+
val predictionsAndLabels = wlDataset.map(lp => weakHypothesis.predict(lp.features))
192+
.zip(wlDataset.map(_.label))
193+
val eps = ClassificationEvaluator.computeMetric(predictionsAndLabels, "accuracy")
194+
val alpha = 0.5 * (math.log(1.0 - eps) - math.log(eps)) // TODO: handle eps near 0
195+
weakHypotheses += weakHypothesis
196+
weakHypothesisWeights += alpha
197+
// Update weights.
198+
val newInstanceWeights = instanceWeights.zip(predictionsAndLabels).map {
199+
case (weight: Double, (pred: Double, label: Double)) =>
200+
???
201+
}
202+
203+
}*/
204+
205+
override def currentModel: AdaBoostModel = {
206+
new AdaBoostModel(weakHypotheses.toArray, weakHypothesisWeights.toArray, parent, paramMap)
207+
}
208+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.ml.classification
19+
20+
import org.apache.spark.annotation.AlphaComponent
21+
import org.apache.spark.ml.evaluation.ClassificationEvaluator
22+
import org.apache.spark.mllib.linalg.Vector
23+
import org.apache.spark.ml._
24+
import org.apache.spark.ml.impl.estimator.{HasDefaultEvaluator, PredictionModel, Predictor,
25+
PredictorParams}
26+
import org.apache.spark.rdd.RDD
27+
28+
@AlphaComponent
29+
private[classification] trait ClassifierParams extends PredictorParams
30+
31+
/**
32+
* Single-label binary or multiclass classification
33+
*/
34+
abstract class Classifier[Learner <: Classifier[Learner, M], M <: ClassificationModel[M]]
35+
extends Predictor[Learner, M]
36+
with ClassifierParams
37+
with HasDefaultEvaluator {
38+
39+
override def defaultEvaluator: Evaluator = new ClassificationEvaluator
40+
}
41+
42+
43+
private[ml] abstract class ClassificationModel[M <: ClassificationModel[M]]
44+
extends PredictionModel[M] with ClassifierParams {
45+
46+
def numClasses: Int
47+
48+
/**
49+
* Predict label for the given features. Labels are indexed {0, 1, ..., numClasses - 1}.
50+
* This default implementation for classification predicts the index of the maximum value
51+
* from [[predictRaw()]].
52+
*/
53+
override def predict(features: Vector): Double = {
54+
predictRaw(features).toArray.zipWithIndex.maxBy(_._1)._2
55+
}
56+
57+
/**
58+
* Raw prediction for each possible label
59+
* @return vector where element i is the raw score for label i
60+
*/
61+
def predictRaw(features: Vector): Vector
62+
63+
/**
64+
* Compute this model's accuracy on the given dataset.
65+
*/
66+
def accuracy(dataset: RDD[LabeledPoint]): Double = {
67+
// TODO: Handle instance weights.
68+
val predictionsAndLabels = dataset.map(lp => predict(lp.features))
69+
.zip(dataset.map(_.label))
70+
ClassificationEvaluator.computeMetric(predictionsAndLabels, "accuracy")
71+
}
72+
73+
}

0 commit comments

Comments
 (0)