From 84f292bc04eac58dd624a8fd7fce54c18f20cd15 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 31 Mar 2016 16:52:20 -0700 Subject: [PATCH 01/11] initial add for OneVsRest --- python/pyspark/ml/classification.py | 205 +++++++++++++++++++++++++++- 1 file changed, 204 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 07cafa099374..cf5a3c820e83 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -17,7 +17,10 @@ import warnings +import operator + from pyspark import since +from pyspark.ml import Estimator, Model from pyspark.ml.util import * from pyspark.ml.wrapper import JavaEstimator, JavaModel from pyspark.ml.param import TypeConverters @@ -25,7 +28,9 @@ from pyspark.ml.regression import ( RandomForestParams, TreeEnsembleParams, DecisionTreeModel, TreeEnsembleModels) from pyspark.mllib.common import inherit_doc - +from pyspark.sql.functions import udf, when +from pyspark.sql.types import MapType, IntegerType, DoubleType +from pyspark.storagelevel import StorageLevel __all__ = ['LogisticRegression', 'LogisticRegressionModel', 'DecisionTreeClassifier', 'DecisionTreeClassificationModel', @@ -905,6 +910,204 @@ def weights(self): return self._call_java("weights") +@inherit_doc +class OneVsRest(Estimator, HasFeaturesCol, HasLabelCol, HasPredictionCol): + """ + Reduction of Multiclass Classification to Binary Classification. + Performs reduction using one against all strategy. + For a multiclass classification with k classes, train k models (one per class). + Each example is scored against all k models and the model with highest score + is picked to label the example. + + >>> from pyspark.sql import Row + >>> from pyspark.mllib.linalg import Vectors + >>> df = sc.parallelize([ + ... Row(label=1.0, features=Vectors.dense(1.0)), + ... Row(label=0.0, features=Vectors.sparse(1, [], []))]).toDF() + >>> lr = LogisticRegression(maxIter=5, regParam=0.01) + >>> ovr = OneVsRest(classifier=lr).setPredictionCol("indexed") + >>> model = ovr.fit(df) + >>> model.models[0].weights + >>> model.models[0].coefficients + >>> model.models[0].intercept + # >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF() + # >>> model.transform(test0).head().indexed + # 0.0 + # >>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF() + # >>> model.transform(test1).head().indexed + # 1.0 + + .. versionadded:: 2.0.0 + """ + + # a placeholder to make it appear in the generated doc + classifier = Param(Params._dummy(), "classifier", "base binary classifier") + + @keyword_only + def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", + classifier=None): + """ + __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ + classifier=None) + """ + super(OneVsRest, self).__init__() + kwargs = self.__init__._input_kwargs + self._set(**kwargs) + + @keyword_only + @since("2.0.0") + def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): + """ + setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): + Sets params for OneVsRest. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + @since("2.0.0") + def setClassifier(self, value): + """ + Sets the value of :py:attr:`estimator`. + """ + self._paramMap[self.classifier] = value + return self + + @since("2.0.0") + def getClassifier(self): + """ + Gets the value of classifier or its default value. + """ + return self.getOrDefault(self.classifier) + + def _fit(self, dataset): + + labelCol = self.getLabelCol() + featureCol = self.getFeaturesCol() + numClasses = int(dataset.agg({labelCol: "max"}).head()["max("+labelCol+")"]) + multiclassLabeled = dataset.select(labelCol, featureCol) + + # persist if underlying dataset is not persistent. + handlePersistence =\ + dataset.rdd.getStorageLevel() == StorageLevel(False, False, False, False) + if handlePersistence: + multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) + + models = [] + + for index in range(0, numClasses): + # newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() + labelColName = "mc2b$" + str(index) + trainingDataset = multiclassLabeled.withColumn( + labelColName, + when(dataset[self.getLabelCol()] == float(index), 1.0).otherwise(0.0)) + classifier = self.getClassifier() + paramMap = dict([(classifier.labelCol, labelColName), + (classifier.featuresCol, self.getFeaturesCol()), + (classifier.predictionCol, self.getPredictionCol())]) + models.append(classifier.fit(trainingDataset, paramMap)) + + if handlePersistence: + multiclassLabeled.unpersist() + + return OneVsRestModel(models=models) + + # @since("2.0.0") + # def copy(self, extra=None): + # """ + # Creates a copy of this instance with a randomly generated uid + # and some extra params. This copies creates a deep copy of + # the embedded paramMap, and copies the embedded and extra parameters over. + + # :param extra: Extra parameters to copy to the new instance + # :return: Copy of this instance + # """ + # if extra is None: + # extra = dict() + # newCV = Params.copy(self, extra) + # if self.isSet(self.estimator): + # newCV.setEstimator(self.getEstimator().copy(extra)) + # # estimatorParamMaps remain the same + # if self.isSet(self.evaluator): + # newCV.setEvaluator(self.getEvaluator().copy(extra)) + # return newCV + + +class OneVsRestModel(Model, HasFeaturesCol, HasLabelCol, HasPredictionCol): + """ + Model produced by [[OneVsRest]]. + This stores the models resulting from training k binary classifiers: one for each class. + Each example is scored against all k models, and the model with the highest score + is picked to label the example. + + .. versionadded:: 2.0.0 + """ + + def __init__(self, models): + super(OneVsRestModel, self).__init__() + #: best model from cross validation + self.models = models + + def _transform(self, dataset): + # determine the input columns: these need to be passed through + origCols = dataset.columns + + # add an accumulator column to store predictions of all the models + accColName = "mbc$acc" + str(uuid.uuid4()) + initUDF = udf(lambda x: {}, MapType(IntegerType(), DoubleType())) + newDataset = dataset.withColumn(accColName, initUDF()) + + # persist if underlying dataset is not persistent. + handlePersistence =\ + dataset.rdd.getStorageLevel() == StorageLevel(False, False, False, False) + if handlePersistence: + newDataset.persist(StorageLevel.MEMORY_AND_DISK) + + # update the accumulator column with the result of prediction of models + updatedDataset = newDataset + for index, model in enumerate(self.models): + rawPredictionCol = model._call_java("getRawPredictionCol") + columns = origCols + [rawPredictionCol, accColName] + + # add temporary column to store intermediate scores and update + tmpColName = "mbc$tmp" + str(uuid.uuid4()) + updateUDF = \ + udf(lambda predictions, prediction: predictions.update({index: prediction(1)})) + transformedDataset = model.transform(newDataset).select(*columns) + updatedDataset = transformedDataset.withColumn( + tmpColName, + updateUDF(transformedDataset[accColName], transformedDataset[rawPredictionCol])) + newColumns = origCols + [tmpColName] + + # switch out the intermediate column with the accumulator column + updatedDataset.select(*newColumns).withColumnRenamed(tmpColName, accColName) + + if handlePersistence: + newDataset.unpersist() + + # output the index of the classifier with highest confidence as prediction + labelUDF = udf( + lambda predictions: float(max(predictions.iteritems(), key=operator.itemgetter(1))[0])) + + # output label and label metadata as prediction + return updatedDataset.withColumn( + self.getPredictionCol(), labelUDF(accColName)).drop(accColName) + + # @since("1.4.0") + # def copy(self, extra=None): + # """ + # Creates a copy of this instance with a randomly generated uid + # and some extra params. This copies the underlying bestModel, + # creates a deep copy of the embedded paramMap, and + # copies the embedded and extra parameters over. + + # :param extra: Extra parameters to copy to the new instance + # :return: Copy of this instance + # """ + # if extra is None: + # extra = dict() + # return OneVsRestModel(self.models.copy(extra)) + + if __name__ == "__main__": import doctest import pyspark.ml.classification From a296a86a9d600c347774403a97be26f2cc370820 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 1 Apr 2016 00:09:52 -0700 Subject: [PATCH 02/11] ser/de error --- python/pyspark/ml/classification.py | 40 +++++++++++++++++------------ 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index cf5a3c820e83..c894c6a9d4ec 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -18,6 +18,7 @@ import warnings import operator +import uuid from pyspark import since from pyspark.ml import Estimator, Model @@ -29,7 +30,7 @@ RandomForestParams, TreeEnsembleParams, DecisionTreeModel, TreeEnsembleModels) from pyspark.mllib.common import inherit_doc from pyspark.sql.functions import udf, when -from pyspark.sql.types import MapType, IntegerType, DoubleType +from pyspark.sql.types import ArrayType, MapType, IntegerType, DoubleType from pyspark.storagelevel import StorageLevel __all__ = ['LogisticRegression', 'LogisticRegressionModel', @@ -930,12 +931,9 @@ class OneVsRest(Estimator, HasFeaturesCol, HasLabelCol, HasPredictionCol): >>> model.models[0].weights >>> model.models[0].coefficients >>> model.models[0].intercept - # >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF() - # >>> model.transform(test0).head().indexed - # 0.0 - # >>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF() - # >>> model.transform(test1).head().indexed - # 1.0 + >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF() + >>> model.transform(test0).show() + 0.0 .. versionadded:: 2.0.0 """ @@ -1053,8 +1051,10 @@ def _transform(self, dataset): # add an accumulator column to store predictions of all the models accColName = "mbc$acc" + str(uuid.uuid4()) - initUDF = udf(lambda x: {}, MapType(IntegerType(), DoubleType())) - newDataset = dataset.withColumn(accColName, initUDF()) + initUDF = udf(lambda _: [], ArrayType(DoubleType())) + newDataset = dataset.withColumn(accColName, initUDF(dataset[origCols[0]])) + + newDataset.show() # persist if underlying dataset is not persistent. handlePersistence =\ @@ -1062,17 +1062,22 @@ def _transform(self, dataset): if handlePersistence: newDataset.persist(StorageLevel.MEMORY_AND_DISK) + def updateDict(predictions, i, prediction): + predictions[i] = prediction[1] + return predictions + # update the accumulator column with the result of prediction of models - updatedDataset = newDataset + aggregatedDataset = newDataset for index, model in enumerate(self.models): rawPredictionCol = model._call_java("getRawPredictionCol") columns = origCols + [rawPredictionCol, accColName] # add temporary column to store intermediate scores and update tmpColName = "mbc$tmp" + str(uuid.uuid4()) - updateUDF = \ - udf(lambda predictions, prediction: predictions.update({index: prediction(1)})) - transformedDataset = model.transform(newDataset).select(*columns) + updateUDF = udf( + lambda predictions, prediction: predictions + [prediction[1]], + ArrayType(DoubleType())) + transformedDataset = model.transform(aggregatedDataset).select(*columns) updatedDataset = transformedDataset.withColumn( tmpColName, updateUDF(transformedDataset[accColName], transformedDataset[rawPredictionCol])) @@ -1080,17 +1085,18 @@ def _transform(self, dataset): # switch out the intermediate column with the accumulator column updatedDataset.select(*newColumns).withColumnRenamed(tmpColName, accColName) + aggregatedDataset = updatedDataset if handlePersistence: newDataset.unpersist() + return aggregatedDataset # output the index of the classifier with highest confidence as prediction - labelUDF = udf( - lambda predictions: float(max(predictions.iteritems(), key=operator.itemgetter(1))[0])) + # labelUDF = udf(lambda predictions: float(max(predictions, key=predictions.get))) # output label and label metadata as prediction - return updatedDataset.withColumn( - self.getPredictionCol(), labelUDF(accColName)).drop(accColName) + # return aggregatedDataset.withColumn( + # self.getPredictionCol(), labelUDF(aggregatedDataset[accColName])).drop(accColName) # @since("1.4.0") # def copy(self, extra=None): From 417d13f34dcf323559e3885960d6633d98da75c0 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 1 Apr 2016 18:26:54 -0700 Subject: [PATCH 03/11] fix error caused by treating nparray as list --- python/pyspark/ml/classification.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index c894c6a9d4ec..e003c9ffa781 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1054,18 +1054,12 @@ def _transform(self, dataset): initUDF = udf(lambda _: [], ArrayType(DoubleType())) newDataset = dataset.withColumn(accColName, initUDF(dataset[origCols[0]])) - newDataset.show() - # persist if underlying dataset is not persistent. handlePersistence =\ dataset.rdd.getStorageLevel() == StorageLevel(False, False, False, False) if handlePersistence: newDataset.persist(StorageLevel.MEMORY_AND_DISK) - def updateDict(predictions, i, prediction): - predictions[i] = prediction[1] - return predictions - # update the accumulator column with the result of prediction of models aggregatedDataset = newDataset for index, model in enumerate(self.models): @@ -1075,7 +1069,7 @@ def updateDict(predictions, i, prediction): # add temporary column to store intermediate scores and update tmpColName = "mbc$tmp" + str(uuid.uuid4()) updateUDF = udf( - lambda predictions, prediction: predictions + [prediction[1]], + lambda predictions, prediction: predictions + [prediction.tolist()[1]], ArrayType(DoubleType())) transformedDataset = model.transform(aggregatedDataset).select(*columns) updatedDataset = transformedDataset.withColumn( @@ -1084,19 +1078,19 @@ def updateDict(predictions, i, prediction): newColumns = origCols + [tmpColName] # switch out the intermediate column with the accumulator column - updatedDataset.select(*newColumns).withColumnRenamed(tmpColName, accColName) - aggregatedDataset = updatedDataset + aggregatedDataset = updatedDataset\ + .select(*newColumns).withColumnRenamed(tmpColName, accColName) if handlePersistence: newDataset.unpersist() - return aggregatedDataset # output the index of the classifier with highest confidence as prediction - # labelUDF = udf(lambda predictions: float(max(predictions, key=predictions.get))) + labelUDF = udf( + lambda predictions: float(max(enumerate(predictions), key=operator.itemgetter(1))[0])) # output label and label metadata as prediction - # return aggregatedDataset.withColumn( - # self.getPredictionCol(), labelUDF(aggregatedDataset[accColName])).drop(accColName) + return aggregatedDataset.withColumn( + self.getPredictionCol(), labelUDF(aggregatedDataset[accColName])).drop(accColName) # @since("1.4.0") # def copy(self, extra=None): From 6d30d772ca619bf2a331f361764a036ab4f4603c Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 1 Apr 2016 18:36:34 -0700 Subject: [PATCH 04/11] add copy and more tests --- python/pyspark/ml/classification.py | 110 ++++++++++++++-------------- 1 file changed, 54 insertions(+), 56 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index e003c9ffa781..7179703e7350 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -15,22 +15,18 @@ # limitations under the License. # -import warnings - import operator -import uuid +import warnings -from pyspark import since from pyspark.ml import Estimator, Model -from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaEstimator, JavaModel -from pyspark.ml.param import TypeConverters from pyspark.ml.param.shared import * from pyspark.ml.regression import ( RandomForestParams, TreeEnsembleParams, DecisionTreeModel, TreeEnsembleModels) +from pyspark.ml.util import * +from pyspark.ml.wrapper import JavaEstimator, JavaModel from pyspark.mllib.common import inherit_doc from pyspark.sql.functions import udf, when -from pyspark.sql.types import ArrayType, MapType, IntegerType, DoubleType +from pyspark.sql.types import ArrayType, DoubleType from pyspark.storagelevel import StorageLevel __all__ = ['LogisticRegression', 'LogisticRegressionModel', @@ -38,7 +34,8 @@ 'GBTClassifier', 'GBTClassificationModel', 'RandomForestClassifier', 'RandomForestClassificationModel', 'NaiveBayes', 'NaiveBayesModel', - 'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel'] + 'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel', + 'OneVsRest', 'OneVsRestModel'] @inherit_doc @@ -923,16 +920,17 @@ class OneVsRest(Estimator, HasFeaturesCol, HasLabelCol, HasPredictionCol): >>> from pyspark.sql import Row >>> from pyspark.mllib.linalg import Vectors >>> df = sc.parallelize([ - ... Row(label=1.0, features=Vectors.dense(1.0)), - ... Row(label=0.0, features=Vectors.sparse(1, [], []))]).toDF() + ... Row(label=0.0, features=Vectors.dense(1.0, 0.8)), + ... Row(label=1.0, features=Vectors.sparse(2, [], [])), + ... Row(label=2.0, features=Vectors.dense(0.5, 0.5))]).toDF() >>> lr = LogisticRegression(maxIter=5, regParam=0.01) >>> ovr = OneVsRest(classifier=lr).setPredictionCol("indexed") >>> model = ovr.fit(df) - >>> model.models[0].weights - >>> model.models[0].coefficients - >>> model.models[0].intercept - >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF() - >>> model.transform(test0).show() + >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0))]).toDF() + >>> model.transform(test0).head().indexed + 1.0 + >>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF() + >>> model.transform(test1).head().indexed 0.0 .. versionadded:: 2.0.0 @@ -965,7 +963,7 @@ def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classif @since("2.0.0") def setClassifier(self, value): """ - Sets the value of :py:attr:`estimator`. + Sets the value of :py:attr:`classifier`. """ self._paramMap[self.classifier] = value return self @@ -985,7 +983,7 @@ def _fit(self, dataset): multiclassLabeled = dataset.select(labelCol, featureCol) # persist if underlying dataset is not persistent. - handlePersistence =\ + handlePersistence = \ dataset.rdd.getStorageLevel() == StorageLevel(False, False, False, False) if handlePersistence: multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) @@ -1007,32 +1005,32 @@ def _fit(self, dataset): if handlePersistence: multiclassLabeled.unpersist() - return OneVsRestModel(models=models) - - # @since("2.0.0") - # def copy(self, extra=None): - # """ - # Creates a copy of this instance with a randomly generated uid - # and some extra params. This copies creates a deep copy of - # the embedded paramMap, and copies the embedded and extra parameters over. - - # :param extra: Extra parameters to copy to the new instance - # :return: Copy of this instance - # """ - # if extra is None: - # extra = dict() - # newCV = Params.copy(self, extra) - # if self.isSet(self.estimator): - # newCV.setEstimator(self.getEstimator().copy(extra)) - # # estimatorParamMaps remain the same - # if self.isSet(self.evaluator): - # newCV.setEvaluator(self.getEvaluator().copy(extra)) - # return newCV + return OneVsRestModel(models=models)\ + .setFeaturesCol(self.getFeaturesCol())\ + .setLabelCol(self.getLabelCol())\ + .setPredictionCol(self.getPredictionCol()) + + @since("2.0.0") + def copy(self, extra=None): + """ + Creates a copy of this instance with a randomly generated uid + and some extra params. This copies creates a deep copy of + the embedded paramMap, and copies the embedded and extra parameters over. + + :param extra: Extra parameters to copy to the new instance + :return: Copy of this instance + """ + if extra is None: + extra = dict() + newOVR = Params.copy(self, extra) + if self.isSet(self.classifier): + newOVR.setClassifier(self.getClassifier().copy(extra)) + return newOVR class OneVsRestModel(Model, HasFeaturesCol, HasLabelCol, HasPredictionCol): """ - Model produced by [[OneVsRest]]. + Model fitted by OneVsRest. This stores the models resulting from training k binary classifiers: one for each class. Each example is scored against all k models, and the model with the highest score is picked to label the example. @@ -1055,7 +1053,7 @@ def _transform(self, dataset): newDataset = dataset.withColumn(accColName, initUDF(dataset[origCols[0]])) # persist if underlying dataset is not persistent. - handlePersistence =\ + handlePersistence = \ dataset.rdd.getStorageLevel() == StorageLevel(False, False, False, False) if handlePersistence: newDataset.persist(StorageLevel.MEMORY_AND_DISK) @@ -1086,26 +1084,26 @@ def _transform(self, dataset): # output the index of the classifier with highest confidence as prediction labelUDF = udf( - lambda predictions: float(max(enumerate(predictions), key=operator.itemgetter(1))[0])) + lambda predictions: float(max(enumerate(predictions), key=operator.itemgetter(1))[0]), + DoubleType()) # output label and label metadata as prediction return aggregatedDataset.withColumn( self.getPredictionCol(), labelUDF(aggregatedDataset[accColName])).drop(accColName) - # @since("1.4.0") - # def copy(self, extra=None): - # """ - # Creates a copy of this instance with a randomly generated uid - # and some extra params. This copies the underlying bestModel, - # creates a deep copy of the embedded paramMap, and - # copies the embedded and extra parameters over. - - # :param extra: Extra parameters to copy to the new instance - # :return: Copy of this instance - # """ - # if extra is None: - # extra = dict() - # return OneVsRestModel(self.models.copy(extra)) + @since("2.0.0") + def copy(self, extra=None): + """ + Creates a copy of this instance with a randomly generated uid + and some extra params. This copies creates a deep copy of + the embedded paramMap, and copies the embedded and extra parameters over. + + :param extra: Extra parameters to copy to the new instance + :return: Copy of this instance + """ + if extra is None: + extra = dict() + return OneVsRestModel([model.copy(extra) for model in self.models.copy(extra)]) if __name__ == "__main__": From b17cc7b8cb33af7bebb444832a2b7fd9e961ea93 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Sun, 3 Apr 2016 17:27:42 -0700 Subject: [PATCH 05/11] fix nits --- python/pyspark/ml/classification.py | 38 ++++++++++++++++------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 7179703e7350..36aa894fdd71 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -932,6 +932,9 @@ class OneVsRest(Estimator, HasFeaturesCol, HasLabelCol, HasPredictionCol): >>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF() >>> model.transform(test1).head().indexed 0.0 + >>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4))]).toDF() + >>> model.transform(test2).head().indexed + 2.0 .. versionadded:: 2.0.0 """ @@ -976,11 +979,13 @@ def getClassifier(self): return self.getOrDefault(self.classifier) def _fit(self, dataset): - labelCol = self.getLabelCol() - featureCol = self.getFeaturesCol() - numClasses = int(dataset.agg({labelCol: "max"}).head()["max("+labelCol+")"]) - multiclassLabeled = dataset.select(labelCol, featureCol) + featuresCol = self.getFeaturesCol() + predictionCol = self.getPredictionCol() + + numClasses = int(dataset.agg({labelCol: "max"}).head()["max("+labelCol+")"]) + 1 + + multiclassLabeled = dataset.select(labelCol, featuresCol) # persist if underlying dataset is not persistent. handlePersistence = \ @@ -991,24 +996,23 @@ def _fit(self, dataset): models = [] for index in range(0, numClasses): - # newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() - labelColName = "mc2b$" + str(index) + binaryLabelCol = "mc2b$" + str(index) trainingDataset = multiclassLabeled.withColumn( - labelColName, - when(dataset[self.getLabelCol()] == float(index), 1.0).otherwise(0.0)) + binaryLabelCol, + when(dataset[labelCol] == float(index), 1.0).otherwise(0.0)) classifier = self.getClassifier() - paramMap = dict([(classifier.labelCol, labelColName), - (classifier.featuresCol, self.getFeaturesCol()), - (classifier.predictionCol, self.getPredictionCol())]) + paramMap = dict([(classifier.labelCol, binaryLabelCol), + (classifier.featuresCol, featuresCol), + (classifier.predictionCol, predictionCol)]) models.append(classifier.fit(trainingDataset, paramMap)) if handlePersistence: multiclassLabeled.unpersist() return OneVsRestModel(models=models)\ - .setFeaturesCol(self.getFeaturesCol())\ - .setLabelCol(self.getLabelCol())\ - .setPredictionCol(self.getPredictionCol()) + .setFeaturesCol(featuresCol)\ + .setLabelCol(labelCol)\ + .setPredictionCol(predictionCol) @since("2.0.0") def copy(self, extra=None): @@ -1022,10 +1026,10 @@ def copy(self, extra=None): """ if extra is None: extra = dict() - newOVR = Params.copy(self, extra) + newOvr = Params.copy(self, extra) if self.isSet(self.classifier): - newOVR.setClassifier(self.getClassifier().copy(extra)) - return newOVR + newOvr.setClassifier(self.getClassifier().copy(extra)) + return newOvr class OneVsRestModel(Model, HasFeaturesCol, HasLabelCol, HasPredictionCol): From 47bd7091a75ee6dac34674240acc9a594b157ccd Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 6 Apr 2016 15:05:22 -0700 Subject: [PATCH 06/11] fix multi-thread issue --- python/pyspark/ml/classification.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 36aa894fdd71..0aa3955eb574 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -17,6 +17,7 @@ import operator import warnings +from multiprocessing.dummy import Pool from pyspark.ml import Estimator, Model from pyspark.ml.param.shared import * @@ -926,6 +927,10 @@ class OneVsRest(Estimator, HasFeaturesCol, HasLabelCol, HasPredictionCol): >>> lr = LogisticRegression(maxIter=5, regParam=0.01) >>> ovr = OneVsRest(classifier=lr).setPredictionCol("indexed") >>> model = ovr.fit(df) + >>> [x.coefficients for x in model.models] + [DenseVector([3.3925, 1.8785]), DenseVector([-4.3016, -6.3163]), DenseVector([-4.5855, 6.1785])] + >>> [x.intercept for x in model.models] + [-3.6474708290602034, 2.5507881951814495, -1.1016513228162115] >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0))]).toDF() >>> model.transform(test0).head().indexed 1.0 @@ -982,6 +987,7 @@ def _fit(self, dataset): labelCol = self.getLabelCol() featuresCol = self.getFeaturesCol() predictionCol = self.getPredictionCol() + classifier = self.getClassifier() numClasses = int(dataset.agg({labelCol: "max"}).head()["max("+labelCol+")"]) + 1 @@ -993,18 +999,21 @@ def _fit(self, dataset): if handlePersistence: multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) - models = [] - - for index in range(0, numClasses): + def trainSingleClass(index): binaryLabelCol = "mc2b$" + str(index) trainingDataset = multiclassLabeled.withColumn( binaryLabelCol, - when(dataset[labelCol] == float(index), 1.0).otherwise(0.0)) - classifier = self.getClassifier() + when(multiclassLabeled[labelCol] == float(index), 1.0).otherwise(0.0)) paramMap = dict([(classifier.labelCol, binaryLabelCol), (classifier.featuresCol, featuresCol), (classifier.predictionCol, predictionCol)]) - models.append(classifier.fit(trainingDataset, paramMap)) + duplicatedClassifier = classifier.__class__() + duplicatedClassifier._resetUid(classifier.uid) + classifier._copyValues(duplicatedClassifier) + return duplicatedClassifier.fit(trainingDataset, paramMap) + + pool = Pool() + models = pool.map(trainSingleClass, range(numClasses)) if handlePersistence: multiclassLabeled.unpersist() From cf4df64d90cc00ac8a3a137088f8dab8c6650116 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 6 Apr 2016 21:16:29 -0700 Subject: [PATCH 07/11] revert non-parallel process --- python/pyspark/ml/classification.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 59832387dc5e..be3223e6d719 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -17,7 +17,6 @@ import operator import warnings -from multiprocessing.dummy import Pool from pyspark.ml import Estimator, Model from pyspark.ml.param.shared import * @@ -1202,6 +1201,9 @@ def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classif def setClassifier(self, value): """ Sets the value of :py:attr:`classifier`. + + .. note:: Only LogisticRegression, NaiveBayes and MultilayerPerceptronClassifier are + supported now. """ self._paramMap[self.classifier] = value return self @@ -1237,13 +1239,10 @@ def trainSingleClass(index): paramMap = dict([(classifier.labelCol, binaryLabelCol), (classifier.featuresCol, featuresCol), (classifier.predictionCol, predictionCol)]) - duplicatedClassifier = classifier.__class__() - duplicatedClassifier._resetUid(classifier.uid) - classifier._copyValues(duplicatedClassifier) - return duplicatedClassifier.fit(trainingDataset, paramMap) + return classifier.fit(trainingDataset, paramMap) - pool = Pool() - models = pool.map(trainSingleClass, range(numClasses)) + # TODO: Parallel training for all classes. + models = [trainSingleClass(i) for i in range(numClasses)] if handlePersistence: multiclassLabeled.unpersist() From fd4fc11d1b954584cf06aeb68bfe8ad982519311 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 6 Apr 2016 21:23:19 -0700 Subject: [PATCH 08/11] use copyValues --- python/pyspark/ml/classification.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index be3223e6d719..5640d4c3ac8b 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1247,10 +1247,7 @@ def trainSingleClass(index): if handlePersistence: multiclassLabeled.unpersist() - return OneVsRestModel(models=models)\ - .setFeaturesCol(featuresCol)\ - .setLabelCol(labelCol)\ - .setPredictionCol(predictionCol) + return self._copyValues(OneVsRestModel(models=models)) @since("2.0.0") def copy(self, extra=None): @@ -1264,10 +1261,7 @@ def copy(self, extra=None): """ if extra is None: extra = dict() - newOvr = Params.copy(self, extra) - if self.isSet(self.classifier): - newOvr.setClassifier(self.getClassifier().copy(extra)) - return newOvr + return self._copyValues(OneVsRest(self.getClassifier().copy(extra))) class OneVsRestModel(Model, HasFeaturesCol, HasLabelCol, HasPredictionCol): @@ -1345,7 +1339,7 @@ def copy(self, extra=None): """ if extra is None: extra = dict() - return OneVsRestModel([model.copy(extra) for model in self.models.copy(extra)]) + return self._copyValues(OneVsRestModel([model.copy(extra) for model in self.models])) if __name__ == "__main__": From fb337cff3dfc591b9b7158730edf9fb12236f9ae Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 14 Apr 2016 14:41:21 -0700 Subject: [PATCH 09/11] fix nits --- python/pyspark/ml/classification.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index cd3621992fc8..8f0cb2b7a820 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1165,26 +1165,25 @@ class OneVsRest(Estimator, HasFeaturesCol, HasLabelCol, HasPredictionCol): ... Row(label=1.0, features=Vectors.sparse(2, [], [])), ... Row(label=2.0, features=Vectors.dense(0.5, 0.5))]).toDF() >>> lr = LogisticRegression(maxIter=5, regParam=0.01) - >>> ovr = OneVsRest(classifier=lr).setPredictionCol("indexed") + >>> ovr = OneVsRest(classifier=lr) >>> model = ovr.fit(df) >>> [x.coefficients for x in model.models] [DenseVector([3.3925, 1.8785]), DenseVector([-4.3016, -6.3163]), DenseVector([-4.5855, 6.1785])] >>> [x.intercept for x in model.models] [-3.6474708290602034, 2.5507881951814495, -1.1016513228162115] >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0))]).toDF() - >>> model.transform(test0).head().indexed + >>> model.transform(test0).head().prediction 1.0 >>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF() - >>> model.transform(test1).head().indexed + >>> model.transform(test1).head().prediction 0.0 >>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4))]).toDF() - >>> model.transform(test2).head().indexed + >>> model.transform(test2).head().prediction 2.0 .. versionadded:: 2.0.0 """ - # a placeholder to make it appear in the generated doc classifier = Param(Params._dummy(), "classifier", "base binary classifier") @keyword_only @@ -1213,10 +1212,9 @@ def setClassifier(self, value): """ Sets the value of :py:attr:`classifier`. - .. note:: Only LogisticRegression, NaiveBayes and MultilayerPerceptronClassifier are - supported now. + .. note:: Only LogisticRegression and NaiveBayes are supported now. """ - self._paramMap[self.classifier] = value + self._set(classifier=value) return self @since("2.0.0") @@ -1231,6 +1229,8 @@ def _fit(self, dataset): featuresCol = self.getFeaturesCol() predictionCol = self.getPredictionCol() classifier = self.getClassifier() + assert isinstance(classifier, HasRawPredictionCol),\ + "Classifier %s doesn't extend from HasRawPredictionCol." % type(classifier) numClasses = int(dataset.agg({labelCol: "max"}).head()["max("+labelCol+")"]) + 1 @@ -1264,8 +1264,8 @@ def trainSingleClass(index): def copy(self, extra=None): """ Creates a copy of this instance with a randomly generated uid - and some extra params. This copies creates a deep copy of - the embedded paramMap, and copies the embedded and extra parameters over. + and some extra params. This creates a deep copy of the embedded paramMap, + and copies the embedded and extra parameters over. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance @@ -1290,7 +1290,6 @@ class OneVsRestModel(Model, HasFeaturesCol, HasLabelCol, HasPredictionCol): def __init__(self, models): super(OneVsRestModel, self).__init__() - #: best model from cross validation self.models = models def _transform(self, dataset): From e0cf36f7910102c9aefa970a0acef1a097c6b982 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 14 Apr 2016 16:28:42 -0700 Subject: [PATCH 10/11] add unit tests --- python/pyspark/ml/classification.py | 4 +++- python/pyspark/ml/tests.py | 32 ++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 8f0cb2b7a820..21122a36c5a6 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1352,7 +1352,9 @@ def copy(self, extra=None): """ if extra is None: extra = dict() - return self._copyValues(OneVsRestModel([model.copy(extra) for model in self.models])) + newModel = Params.copy(self, extra) + newModel.models = [model.copy(extra) for model in self.models] + return newModel if __name__ == "__main__": diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index bcbeacbe8049..c8582ea899c9 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -42,7 +42,7 @@ import numpy as np from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer -from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier +from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, OneVsRest from pyspark.ml.clustering import KMeans from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator from pyspark.ml.feature import * @@ -831,6 +831,36 @@ def test_logistic_regression_summary(self): self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC) +class OneVsRestTests(PySparkTestCase): + + def test_copy(self): + sqlContext = SQLContext(self.sc) + df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], + ["label", "features"]) + lr = LogisticRegression(maxIter=5, regParam=0.01) + ovr = OneVsRest(classifier=lr) + ovr1 = ovr.copy({lr.maxIter: 10}) + self.assertEqual(ovr.getClassifier().getMaxIter(), 5) + self.assertEqual(ovr1.getClassifier().getMaxIter(), 10) + model = ovr.fit(df) + model1 = model.copy({model.predictionCol: "indexed"}) + self.assertEqual(model1.getPredictionCol(), "indexed") + + def test_output_columns(self): + sqlContext = SQLContext(self.sc) + df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], + ["label", "features"]) + lr = LogisticRegression(maxIter=5, regParam=0.01) + ovr = OneVsRest(classifier=lr) + model = ovr.fit(df) + output = model.transform(df) + self.assertEqual(output.columns, ["label", "features", "prediction"]) + + if __name__ == "__main__": from pyspark.ml.tests import * if xmlrunner: From 4e95ecb05b08a96d37fd3fbf6212b2f743a79af4 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 14 Apr 2016 16:34:44 -0700 Subject: [PATCH 11/11] fix typo --- python/pyspark/ml/classification.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 21122a36c5a6..d5a195cfeec9 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1344,8 +1344,8 @@ def _transform(self, dataset): def copy(self, extra=None): """ Creates a copy of this instance with a randomly generated uid - and some extra params. This copies creates a deep copy of - the embedded paramMap, and copies the embedded and extra parameters over. + and some extra params. This creates a deep copy of the embedded paramMap, + and copies the embedded and extra parameters over. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance