From 5c8ebe5536f0d7adcd9f3b88d3c1b9a6e9ef910e Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 4 May 2015 23:42:51 +0800 Subject: [PATCH 1/5] Python API for IsotonicRegression --- .../mllib/api/python/PythonMLLibAPI.scala | 16 ++++ .../mllib/regression/IsotonicRegression.scala | 8 ++ python/pyspark/mllib/regression.py | 75 ++++++++++++++++++- 3 files changed, 98 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 6237b64c8f984..222e1599ba1f3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -282,6 +282,22 @@ private[python] class PythonMLLibAPI extends Serializable { map(_.asInstanceOf[Object]).asJava } + /** + * Java stub for Python mllib IsotonicRegression.run() + */ + def trainIsotonicRegressionModel( + data: JavaRDD[Vector], + isotonic: Boolean): JList[Object] = { + val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic) + try { + val model = isotonicRegressionAlg.run(data.rdd.map(_.toArray).map { + x => (x(0), x(1), x(2)) }.persist(StorageLevel.MEMORY_AND_DISK)) + List(model.boundaries, model.predictions).map(_.asInstanceOf[Object]).asJava + } finally { + data.rdd.unpersist(blocking = false) + } + } + /** * Java stub for Python mllib KMeans.run() */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 1d7617046b6c7..c59b4e0a551cb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -21,6 +21,7 @@ import java.io.Serializable import java.lang.{Double => JDouble} import java.util.Arrays.binarySearch +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.json4s._ @@ -57,6 +58,13 @@ class IsotonicRegressionModel ( assertOrdered(boundaries) assertOrdered(predictions)(predictionOrd) + /** A Java-friendly constructor that takes two Iterable parameters and one Boolean parameter. */ + def this(boundaries: java.lang.Iterable[Double], + predictions: java.lang.Iterable[Double], + isotonic: java.lang.Boolean) = { + this(boundaries.asScala.toArray, predictions.asScala.toArray, isotonic) + } + /** Asserts the input array is monotone with the given ordering. */ private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = { var i = 1 diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 4bc6351bdf02f..407d64d559a2a 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -18,8 +18,9 @@ import numpy as np from numpy import array +from pyspark import RDD from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc -from pyspark.mllib.linalg import SparseVector, _convert_to_vector +from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector from pyspark.mllib.util import Saveable, Loader __all__ = ['LabeledPoint', 'LinearModel', @@ -396,6 +397,78 @@ def train(rdd, i): return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights) +class IsotonicRegressionModel(Saveable, Loader): + + """Regression model for isotonic regression. + + >>> data = [(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)] + >>> irm = IsotonicRegression.train(sc.parallelize(data)) + >>> irm.predict(1.5) + 2.0 + >>> irm.predict(2.5) + 4.5 + >>> irm.predict(4) + 6.0 + >>> irm.predict(sc.parallelize([1.5, 2.5, 4])).collect() + [2.0, 4.5, 6.0] + >>> import os, tempfile + >>> path = tempfile.mkdtemp() + >>> irm.save(sc, path) + >>> sameModel = IsotonicRegressionModel.load(sc, path) + >>> sameModel.predict(1.5) + 2.0 + >>> sameModel.predict(2.5) + 4.5 + >>> sameModel.predict(4) + 6.0 + >>> try: + ... os.removedirs(path) + ... except OSError: + ... pass + """ + + def __init__(self, boundaries, predictions, isotonic): + self.boundaries = boundaries + self.predictions = predictions + self.isotonic = isotonic + + def predict(self, x): + if isinstance(x, RDD): + return x.map(lambda v: self.predict(v)) + return np.interp(x, self.boundaries, self.predictions) + + def save(self, sc, path): + java_boundaries = _py2java(sc, self.boundaries.tolist()) + java_predictions = _py2java(sc, self.predictions.tolist()) + java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel( + java_boundaries, java_predictions, self.isotonic) + java_model.save(sc._jsc.sc(), path) + + @classmethod + def load(cls, sc, path): + java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load( + sc._jsc.sc(), path) + py_boundaries = _java2py(sc, java_model.boundaries()) + py_predictions = _java2py(sc, java_model.predictions()) + return IsotonicRegressionModel(np.array(py_boundaries), + np.array(py_predictions), java_model.isotonic) + + +class IsotonicRegression(object): + """ + Run IsotonicRegression algorithm to obtain isotonic regression model. + + :param data: RDD of data points + :param isotonic: Whether this is isotonic or antitonic. + """ + @classmethod + def train(cls, data, isotonic=True): + """Train a isotonic regression model on the given data.""" + boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel", + data.map(_convert_to_vector), bool(isotonic)) + return IsotonicRegressionModel(np.array(boundaries), np.array(predictions), isotonic) + + def _test(): import doctest from pyspark import SparkContext From 8214bbb9ebf6cae2a623f708dee3460dd5b30489 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 4 May 2015 23:47:12 +0800 Subject: [PATCH 2/5] fix code style --- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 4 ++-- .../apache/spark/mllib/regression/IsotonicRegression.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 222e1599ba1f3..84cff9bbf61d1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -286,8 +286,8 @@ private[python] class PythonMLLibAPI extends Serializable { * Java stub for Python mllib IsotonicRegression.run() */ def trainIsotonicRegressionModel( - data: JavaRDD[Vector], - isotonic: Boolean): JList[Object] = { + data: JavaRDD[Vector], + isotonic: Boolean): JList[Object] = { val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic) try { val model = isotonicRegressionAlg.run(data.rdd.map(_.toArray).map { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index c59b4e0a551cb..15a0760f5e6c1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -60,8 +60,8 @@ class IsotonicRegressionModel ( /** A Java-friendly constructor that takes two Iterable parameters and one Boolean parameter. */ def this(boundaries: java.lang.Iterable[Double], - predictions: java.lang.Iterable[Double], - isotonic: java.lang.Boolean) = { + predictions: java.lang.Iterable[Double], + isotonic: java.lang.Boolean) = { this(boundaries.asScala.toArray, predictions.asScala.toArray, isotonic) } From ec09412ada43bb9f6832e9b0e178f31013e0302a Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 4 May 2015 23:57:45 +0800 Subject: [PATCH 3/5] fix typos --- python/pyspark/mllib/regression.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 407d64d559a2a..f597f993645b9 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -26,7 +26,8 @@ __all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'LinearRegressionWithSGD', 'RidgeRegressionModel', 'RidgeRegressionWithSGD', - 'LassoModel', 'LassoWithSGD'] + 'LassoModel', 'LassoWithSGD', 'IsotonicRegressionModel', + 'IsotonicRegression'] class LabeledPoint(object): From 4bccfee5ecd3085ed467e186a52088fbcec1cafa Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 5 May 2015 18:42:16 +0800 Subject: [PATCH 4/5] fix doctest --- python/pyspark/mllib/regression.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index f597f993645b9..8d6e3d85a4341 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -402,26 +402,22 @@ class IsotonicRegressionModel(Saveable, Loader): """Regression model for isotonic regression. - >>> data = [(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)] + >>> data = [(1, 0, 1), (2, 1, 1), (3, 2, 1), (1, 3, 1), (6, 4, 1), (17, 5, 1), (16, 6, 1)] >>> irm = IsotonicRegression.train(sc.parallelize(data)) - >>> irm.predict(1.5) + >>> irm.predict(3) 2.0 - >>> irm.predict(2.5) - 4.5 - >>> irm.predict(4) - 6.0 - >>> irm.predict(sc.parallelize([1.5, 2.5, 4])).collect() - [2.0, 4.5, 6.0] + >>> irm.predict(5) + 16.5 + >>> irm.predict(sc.parallelize([3, 5])).collect() + [2.0, 16.5] >>> import os, tempfile >>> path = tempfile.mkdtemp() >>> irm.save(sc, path) >>> sameModel = IsotonicRegressionModel.load(sc, path) - >>> sameModel.predict(1.5) + >>> sameModel.predict(3) 2.0 - >>> sameModel.predict(2.5) - 4.5 - >>> sameModel.predict(4) - 6.0 + >>> sameModel.predict(5) + 16.5 >>> try: ... os.removedirs(path) ... except OSError: From 7f202f964ca6295b64ab7647156cd82fa2e03c51 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 5 May 2015 14:39:27 -0700 Subject: [PATCH 5/5] use Vector to have the best Python 2&3 compatibility --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 8 +++++--- .../spark/mllib/regression/IsotonicRegression.scala | 11 +++++++++-- python/pyspark/mllib/regression.py | 11 +++++------ 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 84cff9bbf61d1..e03b784a368f7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -289,10 +289,12 @@ private[python] class PythonMLLibAPI extends Serializable { data: JavaRDD[Vector], isotonic: Boolean): JList[Object] = { val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic) + val input = data.rdd.map { x => + (x(0), x(1), x(2)) + }.persist(StorageLevel.MEMORY_AND_DISK) try { - val model = isotonicRegressionAlg.run(data.rdd.map(_.toArray).map { - x => (x(0), x(1), x(2)) }.persist(StorageLevel.MEMORY_AND_DISK)) - List(model.boundaries, model.predictions).map(_.asInstanceOf[Object]).asJava + val model = isotonicRegressionAlg.run(input) + List[AnyRef](model.boundaryVector, model.predictionVector).asJava } finally { data.rdd.unpersist(blocking = false) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 15a0760f5e6c1..be2a00c2dfea4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -28,12 +28,13 @@ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ +import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} +import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.SQLContext /** * :: Experimental :: @@ -140,6 +141,12 @@ class IsotonicRegressionModel ( } } + /** A convenient method for boundaries called by the Python API. */ + private[mllib] def boundaryVector: Vector = Vectors.dense(boundaries) + + /** A convenient method for boundaries called by the Python API. */ + private[mllib] def predictionVector: Vector = Vectors.dense(predictions) + override def save(sc: SparkContext, path: String): Unit = { IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic) } diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 8d6e3d85a4341..41bde2ce3e60b 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -445,17 +445,16 @@ def save(self, sc, path): def load(cls, sc, path): java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load( sc._jsc.sc(), path) - py_boundaries = _java2py(sc, java_model.boundaries()) - py_predictions = _java2py(sc, java_model.predictions()) - return IsotonicRegressionModel(np.array(py_boundaries), - np.array(py_predictions), java_model.isotonic) + py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray() + py_predictions = _java2py(sc, java_model.predictionVector()).toArray() + return IsotonicRegressionModel(py_boundaries, py_predictions, java_model.isotonic) class IsotonicRegression(object): """ Run IsotonicRegression algorithm to obtain isotonic regression model. - :param data: RDD of data points + :param data: RDD of (label, feature, weight) tuples. :param isotonic: Whether this is isotonic or antitonic. """ @classmethod @@ -463,7 +462,7 @@ def train(cls, data, isotonic=True): """Train a isotonic regression model on the given data.""" boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel", data.map(_convert_to_vector), bool(isotonic)) - return IsotonicRegressionModel(np.array(boundaries), np.array(predictions), isotonic) + return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic) def _test():