From 19aa5236a20aa2ad7479ed7fdd6db95cc6a6b13f Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 7 May 2014 13:22:13 -0700 Subject: [PATCH 01/26] update toString and add parsers for Vectors and LabeledPoint --- .../apache/spark/mllib/linalg/Vectors.scala | 33 +++++++++++++++++- .../spark/mllib/regression/LabeledPoint.scala | 23 +++++++++++-- .../spark/mllib/linalg/VectorsSuite.scala | 23 +++++++++++++ .../mllib/regression/LabeledPointSuite.scala | 34 +++++++++++++++++++ 4 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 7cdf6bd56acd..fa260ba3d384 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -22,6 +22,7 @@ import java.util.Arrays import scala.annotation.varargs import scala.collection.JavaConverters._ +import scala.util.parsing.combinator.JavaTokenParsers import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV} @@ -124,6 +125,8 @@ object Vectors { }.toSeq) } + private[mllib] def parse(s: String): Vector = VectorParsers(s) + /** * Creates a vector instance from a breeze vector. */ @@ -171,8 +174,11 @@ class SparseVector( val indices: Array[Int], val values: Array[Double]) extends Vector { + require(indices.length == values.length) + override def toString: String = { - "(" + size + "," + indices.zip(values).mkString("[", "," ,"]") + ")" + Seq(size, indices.mkString("[", ",", "]"), values.mkString("[", ",", "]")) + .mkString("(", ",", ")") } override def toArray: Array[Double] = { @@ -188,3 +194,28 @@ class SparseVector( private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size) } + +/** + * Parsers for string representation of [[org.apache.spark.mllib.linalg.Vector]]. + */ +private[mllib] class VectorParsers extends JavaTokenParsers { + lazy val indices: Parser[Array[Int]] = "[" ~ repsep(wholeNumber, ",") ~ "]" ^^ { + case "[" ~ ii ~ "]" => ii.map(_.toInt).toArray + } + lazy val values: Parser[Array[Double]] = "[" ~ repsep(floatingPointNumber, ",") ~ "]" ^^ { + case "[" ~ vv ~ "]" => vv.map(_.toDouble).toArray + } + lazy val denseVector: Parser[DenseVector] = values ^^ { + case vv => new DenseVector(vv) + } + lazy val sparseVector: Parser[SparseVector] = + "(" ~ wholeNumber ~ "," ~ indices ~ "," ~ values ~ ")" ^^ { + case "(" ~ size ~ "," ~ ii ~ "," ~ vv ~ ")" => new SparseVector(size.toInt, ii, vv) + } + lazy val vector: Parser[Vector] = denseVector | sparseVector +} + +private[mllib] object VectorParsers extends VectorParsers { + /** Parses a string into an [[org.apache.spark.mllib.linalg.Vector]]. */ + def apply(s: String): Vector = parse(vector, s).get +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 3deab1ab785b..7d9e1e90ad39 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.regression -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Vector, VectorParsers} /** * Class that represents the features and labels of a data point. @@ -27,6 +27,25 @@ import org.apache.spark.mllib.linalg.Vector */ case class LabeledPoint(label: Double, features: Vector) { override def toString: String = { - "LabeledPoint(%s, %s)".format(label, features) + Seq(label, features).mkString("(", ",", ")") } } + +object LabeledPoint { + /** Parses a string into an [[org.apache.spark.mllib.regression.LabeledPoint]]. */ + def parse(s: String) = LabeledPointParsers.parse(s) +} + +/** + * Parsers for string representation of [[org.apache.spark.mllib.regression.LabeledPoint]]. + */ +private[mllib] class LabeledPointParsers extends VectorParsers { + lazy val labeledPoint: Parser[LabeledPoint] = "(" ~ floatingPointNumber ~ "," ~ vector ~ ")" ^^ { + case "(" ~ l ~ "," ~ v ~ ")" => LabeledPoint(l.toDouble, v) + } +} + +private[mllib] object LabeledPointParsers extends LabeledPointParsers { + /** Parses a string into an [[org.apache.spark.mllib.regression.LabeledPoint]]. */ + def parse(s: String): LabeledPoint = parse(labeledPoint, s).get +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index cfe8a27fcb71..676e17cdae25 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -100,4 +100,27 @@ class VectorsSuite extends FunSuite { assert(vec2(6) === 4.0) assert(vec2(7) === 0.0) } + + test("parse vectors") { + val vectors = Seq( + Vectors.dense(Array.empty[Double]), + Vectors.dense(1.0), + Vectors.dense(1.0, 0.0, -2.0), + Vectors.sparse(0, Array.empty[Int], Array.empty[Double]), + Vectors.sparse(1, Array(0), Array(1.0)), + Vectors.sparse(3, Array(0, 2), Array(1.0, -2.0))) + vectors.foreach { v => + val v1 = Vectors.parse(v.toString) + assert(v.getClass === v1.getClass) + assert(v === v1) + } + + val malformatted = Seq("1", "[1,]", "[1,2", "(1,[1,2])", "(1,[1],[2.0,1.0])") + malformatted.foreach { s => + intercept[RuntimeException] { + Vectors.parse(s) + println(s"Didn't detect malformatted string $s.") + } + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala new file mode 100644 index 000000000000..9946ff650c51 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors + +class LabeledPointSuite extends FunSuite { + + test("parse labeled points") { + val points = Seq( + LabeledPoint(1.0, Vectors.dense(1.0, 0.0)), + LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0)))) + points.foreach { p => + assert(p === LabeledPoint.parse(p.toString)) + } + } +} From 9e63a02b5fcf38c00b1634730e2f385961d0f326 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 7 May 2014 13:53:44 -0700 Subject: [PATCH 02/26] add loadVectors and loadLabeledPoints --- .../apache/spark/mllib/linalg/Vectors.scala | 8 +++-- .../spark/mllib/regression/LabeledPoint.scala | 5 ++- .../org/apache/spark/mllib/util/MLUtils.scala | 33 +++++++++++++++++++ .../spark/mllib/util/MLUtilsSuite.scala | 31 +++++++++++++++++ 4 files changed, 74 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index fa260ba3d384..60eca98004b6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -125,7 +125,11 @@ object Vectors { }.toSeq) } - private[mllib] def parse(s: String): Vector = VectorParsers(s) + /** + * Parses a string resulted from `Vector#toString` into + * an [[org.apache.spark.mllib.linalg.Vector]]. + */ + def parse(s: String): Vector = VectorParsers.parse(s) /** * Creates a vector instance from a breeze vector. @@ -217,5 +221,5 @@ private[mllib] class VectorParsers extends JavaTokenParsers { private[mllib] object VectorParsers extends VectorParsers { /** Parses a string into an [[org.apache.spark.mllib.linalg.Vector]]. */ - def apply(s: String): Vector = parse(vector, s).get + def parse(s: String): Vector = parse(vector, s).get } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 7d9e1e90ad39..9ad5dad6da9b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -32,7 +32,10 @@ case class LabeledPoint(label: Double, features: Vector) { } object LabeledPoint { - /** Parses a string into an [[org.apache.spark.mllib.regression.LabeledPoint]]. */ + /** + * Parses a string resulted from `LabeledPoint#toString` into + * an [[org.apache.spark.mllib.regression.LabeledPoint]]. + */ def parse(s: String) = LabeledPointParsers.parse(s) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index e598b6cb171a..8d645096ca6d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -179,6 +179,39 @@ object MLUtils { dataStr.saveAsTextFile(dir) } + /** + * Loads vectors saved using `RDD[Vector]#saveAsTextFile`. + * @param sc Spark context + * @param path file or directory path in any Hadoop-supported file system URI + * @param minPartitions min number of partitions + * @return vectors stored as an RDD[Vector] + */ + def loadVectors(sc: SparkContext, path: String, minPartitions: Int): RDD[Vector] = + sc.textFile(path, minPartitions).map(Vectors.parse) + + /** + * Loads vectors saved using `RDD[Vector]#saveAsTextFile` with the default number of partitions. + */ + def loadVectors(sc: SparkContext, path: String): RDD[Vector] = + sc.textFile(path, sc.defaultMinPartitions).map(Vectors.parse) + + /** + * Loads labeled points saved using `RDD[LabeledPoint]#saveAsTextFile`. + * @param sc Spark context + * @param path file or directory path in any Hadoop-supported file system URI + * @param minPartitions min number of partitions + * @return labeled points stored as an RDD[LabeledPoint] + */ + def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): RDD[LabeledPoint] = + sc.textFile(path, minPartitions).map(LabeledPoint.parse) + + /** + * Loads labeled points saved using `RDD[LabeledPoint]#saveAsTextFile` with the default number of + * partitions. + */ + def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] = + loadLabeledPoints(sc, dir, sc.defaultMinPartitions) + /** * :: Experimental :: * Load labeled data from a file. The data format used here is diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 3f64baf6fe41..83c9b8b5b21b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -158,6 +158,37 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { } } + test("loadVectors") { + val vectors = sc.parallelize(Seq( + Vectors.dense(1.0, 2.0), + Vectors.sparse(2, Array(1), Array(-1.0)), + Vectors.dense(0.0, 1.0) + ), 2) + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "vectors") + val path = outputDir.toURI.toString + vectors.saveAsTextFile(path) + val loaded = loadVectors(sc, outputDir.toURI.toString) + assert(vectors.collect().toSet === loaded.collect().toSet) + deleteQuietly(tempDir) + } + + test("loadLabeledPoints") { + val points = sc.parallelize(Seq( + LabeledPoint(1.0, Vectors.dense(1.0, 2.0)), + LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))), + LabeledPoint(1.0, Vectors.dense(0.0, 1.0)) + ), 2) + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "points") + val path = outputDir.toURI.toString + println(path) + points.saveAsTextFile(path) + val loaded = loadLabeledPoints(sc, path) + assert(points.collect().toSet === loaded.collect().toSet) + deleteQuietly(tempDir) + } + /** Delete a file/directory quietly. */ def deleteQuietly(f: File) { if (f.isDirectory) { From e761d32f7ee62107bcef17fe7491ab086641ac5b Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 7 May 2014 14:52:17 -0700 Subject: [PATCH 03/26] make LabelPoint.parse compatible with the dense format used before v1.0 and deprecate loadLabeledData and saveLabeledData --- .../examples/mllib/DecisionTreeRunner.scala | 2 +- .../spark/mllib/regression/LabeledPoint.scala | 16 ++++++++++++---- .../spark/mllib/util/LinearDataGenerator.scala | 3 ++- .../util/LogisticRegressionDataGenerator.scala | 3 ++- .../org/apache/spark/mllib/util/MLUtils.scala | 12 ++++++++---- .../spark/mllib/util/SVMDataGenerator.scala | 2 +- .../mllib/regression/LabeledPointSuite.scala | 5 +++++ 7 files changed, 31 insertions(+), 12 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala index 0bd847d7bab3..4ab7c4c0211c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala @@ -99,7 +99,7 @@ object DecisionTreeRunner { val sc = new SparkContext(conf) // Load training data and cache it. - val examples = MLUtils.loadLabeledData(sc, params.input).cache() + val examples = MLUtils.loadLabeledPoints(sc, params.input).cache() val splits = examples.randomSplit(Array(0.8, 0.2)) val training = splits(0).cache() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 9ad5dad6da9b..d64869057b93 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.regression -import org.apache.spark.mllib.linalg.{Vector, VectorParsers} +import org.apache.spark.mllib.linalg.{Vectors, Vector, VectorParsers} /** * Class that represents the features and labels of a data point. @@ -43,9 +43,17 @@ object LabeledPoint { * Parsers for string representation of [[org.apache.spark.mllib.regression.LabeledPoint]]. */ private[mllib] class LabeledPointParsers extends VectorParsers { - lazy val labeledPoint: Parser[LabeledPoint] = "(" ~ floatingPointNumber ~ "," ~ vector ~ ")" ^^ { - case "(" ~ l ~ "," ~ v ~ ")" => LabeledPoint(l.toDouble, v) - } + /** Parser for the dense format used before v1.0. */ + lazy val labeledPointV0: Parser[LabeledPoint] = + floatingPointNumber ~ "," ~ rep(floatingPointNumber) ^^ { + case l ~ "," ~ vv => LabeledPoint(l.toDouble, Vectors.dense(vv.map(_.toDouble).toArray)) + } + /** Parser for strings resulted from `LabeledPoint#toString` in v1.0. */ + lazy val labeledPointV1: Parser[LabeledPoint] = + "(" ~ floatingPointNumber ~ "," ~ vector ~ ")" ^^ { + case "(" ~ l ~ "," ~ v ~ ")" => LabeledPoint(l.toDouble, v) + } + lazy val labeledPoint: Parser[LabeledPoint] = labeledPointV1 | labeledPointV0 } private[mllib] object LabeledPointParsers extends LabeledPointParsers { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala index c8e160d00c2d..69299c219878 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala @@ -129,7 +129,8 @@ object LinearDataGenerator { val sc = new SparkContext(sparkMaster, "LinearDataGenerator") val data = generateLinearRDD(sc, nexamples, nfeatures, eps, nparts = parts) - MLUtils.saveLabeledData(data, outputPath) + data.saveAsTextFile(outputPath) + sc.stop() } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala index c82cd8fd4641..9d802678c4a7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala @@ -79,7 +79,8 @@ object LogisticRegressionDataGenerator { val sc = new SparkContext(sparkMaster, "LogisticRegressionDataGenerator") val data = generateLogisticRDD(sc, nexamples, nfeatures, eps, parts) - MLUtils.saveLabeledData(data, outputPath) + data.saveAsTextFile(outputPath) + sc.stop() } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 8d645096ca6d..123f924145e1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -213,7 +213,6 @@ object MLUtils { loadLabeledPoints(sc, dir, sc.defaultMinPartitions) /** - * :: Experimental :: * Load labeled data from a file. The data format used here is * , ... * where , are feature values in Double and is the corresponding label as Double. @@ -222,8 +221,11 @@ object MLUtils { * @param dir Directory to the input data files. * @return An RDD of LabeledPoint. Each labeled point has two elements: the first element is * the label, and the second element represents the feature values (an array of Double). + * + * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and + * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading. */ - @Experimental + @deprecated("Should use RDD#saveAsTextFile and MLUtils#loadLabeledPoints instead.", "1.0") def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = { sc.textFile(dir).map { line => val parts = line.split(',') @@ -234,15 +236,17 @@ object MLUtils { } /** - * :: Experimental :: * Save labeled data to a file. The data format used here is * , ... * where , are feature values in Double and is the corresponding label as Double. * * @param data An RDD of LabeledPoints containing data to be saved. * @param dir Directory to save the data. + * + * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and + * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading. */ - @Experimental + @deprecated("Should use RDD#saveAsTextFile and MLUtils#loadLabeledPoints instead.", "1.0") def saveLabeledData(data: RDD[LabeledPoint], dir: String) { val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" ")) dataStr.saveAsTextFile(dir) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala index ba8190b0e07e..7db97e6bac68 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala @@ -65,7 +65,7 @@ object SVMDataGenerator { LabeledPoint(y, Vectors.dense(x)) } - MLUtils.saveLabeledData(data, outputPath) + data.saveAsTextFile(outputPath) sc.stop() } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala index 9946ff650c51..110c44a7193f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala @@ -31,4 +31,9 @@ class LabeledPointSuite extends FunSuite { assert(p === LabeledPoint.parse(p.toString)) } } + + test("parse labeled points with v0.9 format") { + val point = LabeledPoint.parse("1.0,1.0 0.0 -2.0") + assert(point === LabeledPoint(1.0, Vectors.dense(1.0, 0.0, -2.0))) + } } From 7853f887e15044ee65e5c249f376789afbdf9ad2 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 7 May 2014 15:13:00 -0700 Subject: [PATCH 04/26] update pyspark's SparseVector.__str__ --- python/pyspark/mllib/linalg.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 0aa3a51de706..3d55098efc56 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -43,11 +43,11 @@ def __init__(self, size, *args): or two sorted lists containing indices and values. >>> print SparseVector(4, {1: 1.0, 3: 5.5}) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print SparseVector(4, [(1, 1.0), (3, 5.5)]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print SparseVector(4, [1, 3], [1.0, 5.5]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) """ assert type(size) == int, "first argument must be an int" self.size = size @@ -161,10 +161,9 @@ def squared_distance(self, other): return result def __str__(self): - inds = self.indices - vals = self.values - entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))]) - return "[" + entries + "]" + inds = "[" + ",".join([str(i) for i in self.indices]) + "]" + vals = "[" + ",".join([str(v) for v in self.values]) + "]" + return "(" + ",".join((str(self.size), inds, vals)) + ")" def __repr__(self): inds = self.indices @@ -215,11 +214,11 @@ def sparse(size, *args): or two sorted lists containing indices and values. >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5}) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5]) - [1: 1.0, 3: 5.5] + (4,[1,3],[1.0,5.5]) """ return SparseVector(size, *args) From 5c2dbfa1b6e43267b5bbb179093729de3b9ac42e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 7 May 2014 15:28:54 -0700 Subject: [PATCH 05/26] add parse to pyspark's Vectors --- python/pyspark/mllib/linalg.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 3d55098efc56..c670a9c3f487 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -233,6 +233,26 @@ def dense(elements): """ return array(elements, dtype=float64) + @staticmethod + def parse(s): + """ + Parses a string resulted from str() into a vector. + + >>> Vectors.parse("[0.0,1.0]") + array([ 0., 1.]) + >>> print Vectors.parse("(2,[1],[1.0])") + (2,[1],[1.0]) + """ + return Vectors._parse_structured(eval(s)) + + @staticmethod + def _parse_structured(data): + if type(data) == list: + return Vectors.dense(data) + elif type(data) == tuple: + return Vectors.sparse(data[0], data[1], data[2]) + else: + raise SyntaxError("Cannot recognize " + data) def _test(): import doctest From a7a178e773851913b894e975f7afbe315cee3199 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 7 May 2014 15:34:25 -0700 Subject: [PATCH 06/26] add stringify to pyspark's Vectors --- python/pyspark/mllib/linalg.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index c670a9c3f487..fbccdedf2759 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -236,7 +236,7 @@ def dense(elements): @staticmethod def parse(s): """ - Parses a string resulted from str() into a vector. + Parses a string resulted from Vectors.stringify() into a vector. >>> Vectors.parse("[0.0,1.0]") array([ 0., 1.]) @@ -254,6 +254,22 @@ def _parse_structured(data): else: raise SyntaxError("Cannot recognize " + data) + @staticmethod + def stringify(vector): + """ + Converts a vector into a string, which can be recognized by + Vectors.parse(). + + >>> Vectors.stringify(Vectors.sparse(2, [1], [1.0])) + '(2,[1],[1.0])' + >>> Vectors.stringify(Vectors.dense([0.0, 1.0])) + '[0.0,1.0]' + """ + if type(vector) == SparseVector: + return str(vector) + else: + return "[" + ",".join([str(v) for v in vector]) + "]" + def _test(): import doctest (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS) From cd6c78fdc83c9c64bc74b0e5766f79528ca6275c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 7 May 2014 15:46:44 -0700 Subject: [PATCH 07/26] add __str__ and parse to LabeledPoint --- python/pyspark/mllib/linalg.py | 3 +++ python/pyspark/mllib/regression.py | 22 +++++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index fbccdedf2759..1258e06aac60 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -233,6 +233,7 @@ def dense(elements): """ return array(elements, dtype=float64) + @staticmethod def parse(s): """ @@ -245,6 +246,7 @@ def parse(s): """ return Vectors._parse_structured(eval(s)) + @staticmethod def _parse_structured(data): if type(data) == list: @@ -254,6 +256,7 @@ def _parse_structured(data): else: raise SyntaxError("Cannot recognize " + data) + @staticmethod def stringify(vector): """ diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 266b31d3fab0..7dcf98bfb31e 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -23,7 +23,7 @@ _serialize_double_vector, _deserialize_double_vector, \ _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ _linear_predictor_typecheck, _have_scipy, _scipy_issparse -from pyspark.mllib.linalg import SparseVector +from pyspark.mllib.linalg import SparseVector, Vectors class LabeledPoint(object): @@ -44,6 +44,26 @@ def __init__(self, label, features): else: raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix") + def __str__(self): + return "(" + ",".join((str(self.label), Vectors.stringify(self.features))) + ")" + + + @staticmethod + def parse(s): + """ + Parses a string resulted from str() to a LabeledPoint. + + >>> print LabeledPoint.parse("(1.0,[0.0,1.0])") + (1.0,[0.0,1.0]) + >>> print LabeledPoint.parse("(1.0,(2,[1],[1.0]))") + (1.0,(2,[1],[1.0])) + """ + return LabeledPoint._parse_structured(eval(s)) + + + @staticmethod + def _parse_structured(data): + return LabeledPoint(data[0], Vectors._parse_structured(data[1])) class LinearModel(object): """A linear model that has a vector of coefficients and an intercept.""" From 63dc396a1516e17549fa4ce6e153bd3e68bbb18c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 7 May 2014 17:24:59 -0700 Subject: [PATCH 08/26] add loadLabeledPoints to pyspark --- python/pyspark/mllib/util.py | 61 +++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 50d0cdd08762..d7ab39001320 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -106,24 +106,18 @@ def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=Non >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect() >>> multiclass_examples = MLUtils.loadLibSVMFile(sc, tempFile.name, True).collect() >>> tempFile.close() - >>> examples[0].label - 1.0 - >>> examples[0].features.size - 6 - >>> print examples[0].features - [0: 1.0, 2: 2.0, 4: 3.0] - >>> examples[1].label - 0.0 - >>> examples[1].features.size - 6 - >>> print examples[1].features - [] - >>> examples[2].label - 0.0 - >>> examples[2].features.size - 6 - >>> print examples[2].features - [1: 4.0, 3: 5.0, 5: 6.0] + >>> type(examples[0]) == LabeledPoint + True + >>> print examples[0] + (1.0,(6,[0,2,4],[1.0,2.0,3.0])) + >>> type(examples[1]) == LabeledPoint + True + >>> print examples[1] + (0.0,(6,[],[])) + >>> type(examples[2]) == LabeledPoint + True + >>> print examples[2] + (0.0,(6,[1,3,5],[4.0,5.0,6.0])) >>> multiclass_examples[1].label -1.0 """ @@ -160,6 +154,37 @@ def saveAsLibSVMFile(data, dir): lines.saveAsTextFile(dir) + @staticmethod + def loadLabeledPoints(sc, path, minPartitions=None): + """ + Load labeled points saved using RDD.saveAsTextFile. + + @param sc: Spark context + @param path: file or directory path in any Hadoop-supported file + system URI + @param minPartitions: min number of partitions + @return: labeled data stored as an RDD of LabeledPoint + + >>> from tempfile import NamedTemporaryFile + >>> from pyspark.mllib.util import MLUtils + >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \ + LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] + >>> tempFile = NamedTemporaryFile(delete=True) + >>> tempFile.close() + >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name) + >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect() + >>> type(loaded[0]) == LabeledPoint + True + >>> print examples[0] + (1.1,(3,[0,2],[1.23,4.56])) + >>> type(examples[1]) == LabeledPoint + True + >>> print examples[1] + (0.0,[1.01,2.02,3.03]) + + """ + return sc.textFile(path, minPartitions).map(LabeledPoint.parse) + def _test(): import doctest from pyspark.context import SparkContext From d731817b03ab75a9cf3ca72b9e7bff6d7a8bef47 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 7 May 2014 17:36:12 -0700 Subject: [PATCH 09/26] style update --- .../test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala | 1 - python/pyspark/mllib/util.py | 1 - 2 files changed, 2 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 83c9b8b5b21b..966d67d0a8ce 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -182,7 +182,6 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "points") val path = outputDir.toURI.toString - println(path) points.saveAsTextFile(path) val loaded = loadLabeledPoints(sc, path) assert(points.collect().toSet === loaded.collect().toSet) diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index d7ab39001320..48a79e8e1bce 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -181,7 +181,6 @@ def loadLabeledPoints(sc, path, minPartitions=None): True >>> print examples[1] (0.0,[1.01,2.02,3.03]) - """ return sc.textFile(path, minPartitions).map(LabeledPoint.parse) From b0c50cb505287fb515870fabfeecf75b876bd66c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 8 May 2014 16:21:13 -0700 Subject: [PATCH 10/26] add customized parser --- .../apache/spark/mllib/linalg/Vectors.scala | 21 ++- .../spark/mllib/regression/LabeledPoint.scala | 17 ++- .../spark/mllib/util/NumericParser.scala | 139 ++++++++++++++++++ .../spark/mllib/linalg/VectorsSuite.scala | 2 +- .../spark/mllib/util/NumericParserSuite.scala | 34 +++++ 5 files changed, 208 insertions(+), 5 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 60eca98004b6..909c77b98f63 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -17,14 +17,16 @@ package org.apache.spark.mllib.linalg -import java.lang.{Iterable => JavaIterable, Integer => JavaInteger, Double => JavaDouble} +import java.lang.{Double => JavaDouble, Integer => JavaInteger, Iterable => JavaIterable} import java.util.Arrays import scala.annotation.varargs import scala.collection.JavaConverters._ import scala.util.parsing.combinator.JavaTokenParsers -import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV} +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} + +import org.apache.spark.mllib.util.NumericParser /** * Represents a numeric vector, whose index type is Int and value type is Double. @@ -129,7 +131,20 @@ object Vectors { * Parses a string resulted from `Vector#toString` into * an [[org.apache.spark.mllib.linalg.Vector]]. */ - def parse(s: String): Vector = VectorParsers.parse(s) + def parse(s: String): Vector = { + parseNumeric(NumericParser.parse(s)) + } + + private[mllib] def parseNumeric(any: Any): Vector = { + any match { + case values: Array[Double] => + Vectors.dense(values) + case Seq(size: Double, indices: Array[Double], values: Array[Double]) => + Vectors.sparse(size.toInt, indices.map(_.toInt), values) + case other => + sys.error(s"Cannot parse $other.") + } + } /** * Creates a vector instance from a breeze vector. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index d64869057b93..944380319b93 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.regression import org.apache.spark.mllib.linalg.{Vectors, Vector, VectorParsers} +import org.apache.spark.mllib.util.NumericParser /** * Class that represents the features and labels of a data point. @@ -36,7 +37,21 @@ object LabeledPoint { * Parses a string resulted from `LabeledPoint#toString` into * an [[org.apache.spark.mllib.regression.LabeledPoint]]. */ - def parse(s: String) = LabeledPointParsers.parse(s) + def parse(s: String): LabeledPoint = { + if (s.startsWith("(") || s.startsWith(")")) { + NumericParser.parse(s) match { + case Seq(label: Double, numeric: Any) => + LabeledPoint(label, Vectors.parseNumeric(numeric)) + case other => + sys.error(s"Cannot parse $other.") + } + } else { + val parts = s.split(',') + val label = parts(0).toDouble + val features = Vectors.dense(parts(1).trim().split(' ').map(_.toDouble)) + LabeledPoint(label, features) + } + } } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala new file mode 100644 index 000000000000..b011314e9e24 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -0,0 +1,139 @@ +package org.apache.spark.mllib.util + +import scala.collection.mutable.{ArrayBuffer, ListBuffer} + +object NumericTokenizer { + val NUMBER = -1 + val END = -2 +} + +import NumericTokenizer._ + +/** + * Simple tokenizer for a numeric structure consisting of three types: + * + * - number: a double in Java's floating number format + * - array: an array of numbers stored as `[v0,v1,...,vn]` + * - tuple: a list of numbers, arrays, or tuples stored as `(...)` + * + @param s input string + * @param start start index + * @param end end index + */ +private[mllib] class NumericTokenizer(s: String, start: Int, end: Int) { + + /** + * Creates a tokenizer for the entire input string. + */ + def this(s: String) = this(s, 0, s.length) + + private var cur = start + private var allowComma = false + private var _value = Double.NaN + + /** + * Returns the most recent parsed number. + */ + def value: Double = _value + + /** + * Returns the next token, which could be any of the following: + * - '[', ']', '(', or ')'. + * - [[org.apache.spark.mllib.util.NumericTokenizer#NUMBER]], call value() to get its value. + * - [[org.apache.spark.mllib.util.NumericTokenizer#END]]. + */ + def next(): Int = { + if (cur < end) { + val c = s(cur) + if (c == ',' && allowComma) { + cur += 1 + allowComma = false + return next() + } + + c match { + case '(' | '[' => + allowComma = false + cur += 1 + c + case ')' | ']' => + allowComma = true + cur += 1 + c + case ',' => + if (allowComma) { + cur += 1 + allowComma = false + next() + } else { + sys.error("Found a ',' at a wrong location.") + } + case other => // expecting a number + var inNumber = true + val sb = new StringBuilder() + while (cur < end && inNumber) { + val d = s(cur) + if (d == ')' || d == ']' || d == ',') { + inNumber = false + } else { + sb.append(d) + cur += 1 + } + } + _value = sb.toString().toDouble + allowComma = true + NUMBER + } + } else { + END + } + } +} + +private[mllib] object NumericParser { + + /** Parses a string into a Double, an Array[Double], or a Seq[Any]. */ + def parse(s: String): Any = parse(new NumericTokenizer(s)) + + private def parse(tokenizer: NumericTokenizer): Any = { + tokenizer.next() match { + case '(' => + parseTuple(tokenizer) + case '[' => + parseArray(tokenizer) + case NUMBER => + tokenizer.value + case END => + null + } + } + + private def parseArray(tokenizer: NumericTokenizer): Array[Double] = { + val values = ArrayBuffer.empty[Double] + var token = tokenizer.next() + while (token == NUMBER) { + values.append(tokenizer.value) + token = tokenizer.next() + } + require(token == ']') + values.toArray + } + + private def parseTuple(tokenizer: NumericTokenizer): List[_] = { + val items = ListBuffer.empty[Any] + var token = tokenizer.next() + while (token != ')' && token != END) { + token match { + case '(' => + items.append(parseTuple(tokenizer)) + case '[' => + items.append(parseArray(tokenizer)) + case NUMBER => + items.append(tokenizer.value) + } + token = tokenizer.next() + } + require(token == ')') + items.toList + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 676e17cdae25..53594694ff18 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -115,7 +115,7 @@ class VectorsSuite extends FunSuite { assert(v === v1) } - val malformatted = Seq("1", "[1,]", "[1,2", "(1,[1,2])", "(1,[1],[2.0,1.0])") + val malformatted = Seq("1", "[1,,]", "[1,2", "(1,[1,2])", "(1,[1],[2.0,1.0])") malformatted.foreach { s => intercept[RuntimeException] { Vectors.parse(s) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala new file mode 100644 index 000000000000..fee28a1ffcb4 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala @@ -0,0 +1,34 @@ +package org.apache.spark.mllib.util + +import org.scalatest.FunSuite +import scala.collection.mutable.ListBuffer + +class NumericParserSuite extends FunSuite { + + test("tokenizer") { + val s = "((1,2),4,[5,6],8)" + val tokenizer = new NumericTokenizer(s) + var token = tokenizer.next() + val tokens = ListBuffer.empty[Any] + while (token != NumericTokenizer.END) { + token match { + case NumericTokenizer.NUMBER => + tokens.append(tokenizer.value) + case other => + tokens.append(token) + } + token = tokenizer.next() + } + val expected = Seq('(', '(', 1.0, 2.0, ')', 4.0, '[', 5.0, 6.0, ']', 8.0, ')') + assert(expected === tokens) + } + + test("parser") { + val s = "((1,2),4,[5,6],8)" + val parsed = NumericParser.parse(s).asInstanceOf[Seq[_]] + assert(parsed(0).asInstanceOf[Seq[_]] === Seq(1.0, 2.0)) + assert(parsed(1).asInstanceOf[Double] === 4.0) + assert(parsed(2).asInstanceOf[Array[Double]] === Array(5.0, 6.0)) + assert(parsed(3).asInstanceOf[Double] === 8.0) + } +} From c1885c13af4ec15126d2d84c196f0c3e282027d0 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 8 May 2014 17:13:50 -0700 Subject: [PATCH 11/26] add headers and minor changes --- .../spark/mllib/util/NumericParser.scala | 34 ++++++++++++----- .../spark/mllib/util/NumericParserSuite.scala | 37 +++++++++++++++++-- 2 files changed, 58 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala index b011314e9e24..386da3b6f080 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -1,8 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.util import scala.collection.mutable.{ArrayBuffer, ListBuffer} -object NumericTokenizer { +private[mllib] object NumericTokenizer { val NUMBER = -1 val END = -2 } @@ -16,7 +33,7 @@ import NumericTokenizer._ * - array: an array of numbers stored as `[v0,v1,...,vn]` * - tuple: a list of numbers, arrays, or tuples stored as `(...)` * - @param s input string + * @param s input string * @param start start index * @param end end index */ @@ -45,12 +62,6 @@ private[mllib] class NumericTokenizer(s: String, start: Int, end: Int) { def next(): Int = { if (cur < end) { val c = s(cur) - if (c == ',' && allowComma) { - cur += 1 - allowComma = false - return next() - } - c match { case '(' | '[' => allowComma = false @@ -90,6 +101,9 @@ private[mllib] class NumericTokenizer(s: String, start: Int, end: Int) { } } +/** + * Simple parser for tokens from [[org.apache.spark.mllib.util.NumericTokenizer]]. + */ private[mllib] object NumericParser { /** Parses a string into a Double, an Array[Double], or a Seq[Any]. */ @@ -119,7 +133,7 @@ private[mllib] object NumericParser { values.toArray } - private def parseTuple(tokenizer: NumericTokenizer): List[_] = { + private def parseTuple(tokenizer: NumericTokenizer): Seq[_] = { val items = ListBuffer.empty[Any] var token = tokenizer.next() while (token != ')' && token != END) { @@ -134,6 +148,6 @@ private[mllib] object NumericParser { token = tokenizer.next() } require(token == ')') - items.toList + items.toSeq } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala index fee28a1ffcb4..76c67f77d4f9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala @@ -1,12 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.util -import org.scalatest.FunSuite import scala.collection.mutable.ListBuffer +import org.scalatest.FunSuite + class NumericParserSuite extends FunSuite { test("tokenizer") { - val s = "((1,2),4,[5,6],8)" + val s = "((1.0,2e3),-4,[5e-6,7.0E8],+9)" val tokenizer = new NumericTokenizer(s) var token = tokenizer.next() val tokens = ListBuffer.empty[Any] @@ -19,10 +37,23 @@ class NumericParserSuite extends FunSuite { } token = tokenizer.next() } - val expected = Seq('(', '(', 1.0, 2.0, ')', 4.0, '[', 5.0, 6.0, ']', 8.0, ')') + val expected = Seq('(', '(', 1.0, 2e3, ')', -4.0, '[', 5e-6, 7e8, ']', 9.0, ')') assert(expected === tokens) } + test("tokenizer on malformatted strings") { + val malformatted = Seq("a", "[1,,]", "0.123.4", "1 2", "3+4") + malformatted.foreach { s => + intercept[RuntimeException] { + val tokenizer = new NumericTokenizer(s) + while (tokenizer.next() != NumericTokenizer.END) { + // do nothing + } + println(s"Didn't detect malformatted string $s.") + } + } + } + test("parser") { val s = "((1,2),4,[5,6],8)" val parsed = NumericParser.parse(s).asInstanceOf[Seq[_]] From 7aac03ae58d2e983ac3f0bc5909eab34c73f32e3 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 8 May 2014 17:20:39 -0700 Subject: [PATCH 12/26] remove Scala parsers --- .../apache/spark/mllib/linalg/Vectors.scala | 26 ------------------- .../spark/mllib/regression/LabeledPoint.scala | 26 ++----------------- .../spark/mllib/util/NumericParserSuite.scala | 2 -- 3 files changed, 2 insertions(+), 52 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 909c77b98f63..85da0c6cf806 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -22,7 +22,6 @@ import java.util.Arrays import scala.annotation.varargs import scala.collection.JavaConverters._ -import scala.util.parsing.combinator.JavaTokenParsers import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} @@ -213,28 +212,3 @@ class SparseVector( private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size) } - -/** - * Parsers for string representation of [[org.apache.spark.mllib.linalg.Vector]]. - */ -private[mllib] class VectorParsers extends JavaTokenParsers { - lazy val indices: Parser[Array[Int]] = "[" ~ repsep(wholeNumber, ",") ~ "]" ^^ { - case "[" ~ ii ~ "]" => ii.map(_.toInt).toArray - } - lazy val values: Parser[Array[Double]] = "[" ~ repsep(floatingPointNumber, ",") ~ "]" ^^ { - case "[" ~ vv ~ "]" => vv.map(_.toDouble).toArray - } - lazy val denseVector: Parser[DenseVector] = values ^^ { - case vv => new DenseVector(vv) - } - lazy val sparseVector: Parser[SparseVector] = - "(" ~ wholeNumber ~ "," ~ indices ~ "," ~ values ~ ")" ^^ { - case "(" ~ size ~ "," ~ ii ~ "," ~ vv ~ ")" => new SparseVector(size.toInt, ii, vv) - } - lazy val vector: Parser[Vector] = denseVector | sparseVector -} - -private[mllib] object VectorParsers extends VectorParsers { - /** Parses a string into an [[org.apache.spark.mllib.linalg.Vector]]. */ - def parse(s: String): Vector = parse(vector, s).get -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 944380319b93..0ea3b8580bf9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.regression -import org.apache.spark.mllib.linalg.{Vectors, Vector, VectorParsers} +import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.util.NumericParser /** @@ -45,7 +45,7 @@ object LabeledPoint { case other => sys.error(s"Cannot parse $other.") } - } else { + } else { // dense format used before v1.0 val parts = s.split(',') val label = parts(0).toDouble val features = Vectors.dense(parts(1).trim().split(' ').map(_.toDouble)) @@ -53,25 +53,3 @@ object LabeledPoint { } } } - -/** - * Parsers for string representation of [[org.apache.spark.mllib.regression.LabeledPoint]]. - */ -private[mllib] class LabeledPointParsers extends VectorParsers { - /** Parser for the dense format used before v1.0. */ - lazy val labeledPointV0: Parser[LabeledPoint] = - floatingPointNumber ~ "," ~ rep(floatingPointNumber) ^^ { - case l ~ "," ~ vv => LabeledPoint(l.toDouble, Vectors.dense(vv.map(_.toDouble).toArray)) - } - /** Parser for strings resulted from `LabeledPoint#toString` in v1.0. */ - lazy val labeledPointV1: Parser[LabeledPoint] = - "(" ~ floatingPointNumber ~ "," ~ vector ~ ")" ^^ { - case "(" ~ l ~ "," ~ v ~ ")" => LabeledPoint(l.toDouble, v) - } - lazy val labeledPoint: Parser[LabeledPoint] = labeledPointV1 | labeledPointV0 -} - -private[mllib] object LabeledPointParsers extends LabeledPointParsers { - /** Parses a string into an [[org.apache.spark.mllib.regression.LabeledPoint]]. */ - def parse(s: String): LabeledPoint = parse(labeledPoint, s).get -} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala index 76c67f77d4f9..b8437f7a8601 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala @@ -39,9 +39,7 @@ class NumericParserSuite extends FunSuite { } val expected = Seq('(', '(', 1.0, 2e3, ')', -4.0, '[', 5e-6, 7e8, ']', 9.0, ')') assert(expected === tokens) - } - test("tokenizer on malformatted strings") { val malformatted = Seq("a", "[1,,]", "0.123.4", "1 2", "3+4") malformatted.foreach { s => intercept[RuntimeException] { From 810d6df2701a44c95fe71bf1aae0c2ed13242396 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 14 May 2014 01:21:50 -0700 Subject: [PATCH 13/26] update tokenizer/parser implementation --- .../apache/spark/mllib/linalg/Vectors.scala | 3 +- .../spark/mllib/regression/LabeledPoint.scala | 3 +- .../spark/mllib/util/NumericParser.scala | 109 ++++++++++-------- .../spark/mllib/linalg/VectorsSuite.scala | 8 +- .../spark/mllib/util/NumericParserSuite.scala | 4 +- 5 files changed, 73 insertions(+), 54 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 85da0c6cf806..0ae1f33d0a0b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._ import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} import org.apache.spark.mllib.util.NumericParser +import org.apache.spark.SparkException /** * Represents a numeric vector, whose index type is Int and value type is Double. @@ -141,7 +142,7 @@ object Vectors { case Seq(size: Double, indices: Array[Double], values: Array[Double]) => Vectors.sparse(size.toInt, indices.map(_.toInt), values) case other => - sys.error(s"Cannot parse $other.") + throw new SparkException(s"Cannot parse $other.") } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 0ea3b8580bf9..e6368e869544 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -19,6 +19,7 @@ package org.apache.spark.mllib.regression import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.util.NumericParser +import org.apache.spark.SparkException /** * Class that represents the features and labels of a data point. @@ -43,7 +44,7 @@ object LabeledPoint { case Seq(label: Double, numeric: Any) => LabeledPoint(label, Vectors.parseNumeric(numeric)) case other => - sys.error(s"Cannot parse $other.") + throw new SparkException(s"Cannot parse $other.") } } else { // dense format used before v1.0 val parts = s.split(',') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala index 386da3b6f080..d390498958d8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -19,6 +19,8 @@ package org.apache.spark.mllib.util import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import org.apache.spark.SparkException + private[mllib] object NumericTokenizer { val NUMBER = -1 val END = -2 @@ -61,39 +63,43 @@ private[mllib] class NumericTokenizer(s: String, start: Int, end: Int) { */ def next(): Int = { if (cur < end) { - val c = s(cur) - c match { - case '(' | '[' => - allowComma = false - cur += 1 - c - case ')' | ']' => - allowComma = true + val c = s.charAt(cur) + if (c == '(' || c == '[') { + allowComma = false + cur += 1 + c + } else if (c == ')' || c == ']') { + allowComma = true + cur += 1 + c + } else if (c == ',') { + if (allowComma) { cur += 1 - c - case ',' => - if (allowComma) { - cur += 1 - allowComma = false - next() + allowComma = false + next() + } else { + throw new SparkException(s"Found a ',' at a wrong location: $cur.") + } + } else { + // expecting a number + var inNumber = true + val beginAt = cur + while (cur < end && inNumber) { + val d = s.charAt(cur) + if (d == ')' || d == ']' || d == ',') { + inNumber = false } else { - sys.error("Found a ',' at a wrong location.") - } - case other => // expecting a number - var inNumber = true - val sb = new StringBuilder() - while (cur < end && inNumber) { - val d = s(cur) - if (d == ')' || d == ']' || d == ',') { - inNumber = false - } else { - sb.append(d) - cur += 1 - } + cur += 1 } - _value = sb.toString().toDouble - allowComma = true - NUMBER + } + try { + _value = java.lang.Double.parseDouble(s.substring(beginAt, cur)) + } catch { + case e: Throwable => + throw new SparkException("Error parsing a number", e) + } + allowComma = true + NUMBER } } else { END @@ -110,15 +116,17 @@ private[mllib] object NumericParser { def parse(s: String): Any = parse(new NumericTokenizer(s)) private def parse(tokenizer: NumericTokenizer): Any = { - tokenizer.next() match { - case '(' => - parseTuple(tokenizer) - case '[' => - parseArray(tokenizer) - case NUMBER => - tokenizer.value - case END => - null + val token = tokenizer.next() + if (token == NUMBER) { + tokenizer.value + } else if (token == '(') { + parseTuple(tokenizer) + } else if (token == '[') { + parseArray(tokenizer) + } else if (token == END) { + null + } else { + throw new SparkException(s"Cannot recgonize token type: $token.") } } @@ -129,7 +137,9 @@ private[mllib] object NumericParser { values.append(tokenizer.value) token = tokenizer.next() } - require(token == ']') + if (token != ']') { + throw new SparkException(s"An array must end with ] but got $token.") + } values.toArray } @@ -137,17 +147,20 @@ private[mllib] object NumericParser { val items = ListBuffer.empty[Any] var token = tokenizer.next() while (token != ')' && token != END) { - token match { - case '(' => - items.append(parseTuple(tokenizer)) - case '[' => - items.append(parseArray(tokenizer)) - case NUMBER => - items.append(tokenizer.value) + if (token == NUMBER) { + items.append(tokenizer.value) + } else if (token == '(') { + items.append(parseTuple(tokenizer)) + } else if (token == '[') { + items.append(parseArray(tokenizer)) + } else { + throw new SparkException(s"Cannot recognize token type: $token.") } token = tokenizer.next() } - require(token == ')') + if (token != ')') { + throw new SparkException(s"A tuple must end with ) but got $token.") + } items.toSeq } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 53594694ff18..7972ceea1fe8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.mllib.linalg import org.scalatest.FunSuite +import org.apache.spark.SparkException + class VectorsSuite extends FunSuite { val arr = Array(0.1, 0.0, 0.3, 0.4) @@ -105,7 +107,7 @@ class VectorsSuite extends FunSuite { val vectors = Seq( Vectors.dense(Array.empty[Double]), Vectors.dense(1.0), - Vectors.dense(1.0, 0.0, -2.0), + Vectors.dense(1.0E6, 0.0, -2.0e-7), Vectors.sparse(0, Array.empty[Int], Array.empty[Double]), Vectors.sparse(1, Array(0), Array(1.0)), Vectors.sparse(3, Array(0, 2), Array(1.0, -2.0))) @@ -115,9 +117,9 @@ class VectorsSuite extends FunSuite { assert(v === v1) } - val malformatted = Seq("1", "[1,,]", "[1,2", "(1,[1,2])", "(1,[1],[2.0,1.0])") + val malformatted = Seq("1", "[1,,]", "[1,2b]", "(1,[1,2])", "([1],[2.0,1.0])") malformatted.foreach { s => - intercept[RuntimeException] { + intercept[SparkException] { Vectors.parse(s) println(s"Didn't detect malformatted string $s.") } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala index b8437f7a8601..0591257c0ab1 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala @@ -21,6 +21,8 @@ import scala.collection.mutable.ListBuffer import org.scalatest.FunSuite +import org.apache.spark.SparkException + class NumericParserSuite extends FunSuite { test("tokenizer") { @@ -42,7 +44,7 @@ class NumericParserSuite extends FunSuite { val malformatted = Seq("a", "[1,,]", "0.123.4", "1 2", "3+4") malformatted.foreach { s => - intercept[RuntimeException] { + intercept[SparkException] { val tokenizer = new NumericTokenizer(s) while (tokenizer.next() != NumericTokenizer.END) { // do nothing From aea4ae3e81db4819b92e9627d63cfd284a971bfe Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 14 May 2014 09:09:43 -0700 Subject: [PATCH 14/26] minor updates --- .../main/scala/org/apache/spark/mllib/linalg/Vectors.scala | 6 ++---- .../org/apache/spark/mllib/regression/LabeledPoint.scala | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 0ae1f33d0a0b..2cf863668adf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -195,10 +195,8 @@ class SparseVector( require(indices.length == values.length) - override def toString: String = { - Seq(size, indices.mkString("[", ",", "]"), values.mkString("[", ",", "]")) - .mkString("(", ",", ")") - } + override def toString: String = + "(%s,%s,%s)".format(size, indices.mkString("[", ",", "]"), values.mkString("[", ",", "]")) override def toArray: Array[Double] = { val data = new Array[Double](size) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index e6368e869544..74a8f974ed7c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -29,7 +29,7 @@ import org.apache.spark.SparkException */ case class LabeledPoint(label: Double, features: Vector) { override def toString: String = { - Seq(label, features).mkString("(", ",", ")") + "(%s,%s)".format(label, features) } } From e9fcd498025854c13fd9f0513c63f5f1bbf27297 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 14 May 2014 09:32:41 -0700 Subject: [PATCH 15/26] add serializeLabeledPoint and tests --- .../mllib/api/python/PythonMLLibAPI.scala | 18 +++++- .../api/python/PythonMLLibAPISuite.scala | 60 +++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 7c65b0d4750f..b0d0f37385d3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -27,6 +27,7 @@ import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ import org.apache.spark.rdd.RDD +import org.apache.spark.SparkException /** * :: DeveloperApi :: @@ -41,7 +42,7 @@ class PythonMLLibAPI extends Serializable { private val DENSE_MATRIX_MAGIC: Byte = 3 private val LABELED_POINT_MAGIC: Byte = 4 - private def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = { + private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = { require(bytes.length - offset >= 5, "Byte array too short") val magic = bytes(offset) if (magic == DENSE_VECTOR_MAGIC) { @@ -116,7 +117,7 @@ class PythonMLLibAPI extends Serializable { bytes } - private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match { + private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match { case s: SparseVector => serializeSparseVector(s) case _ => @@ -167,7 +168,18 @@ class PythonMLLibAPI extends Serializable { bytes } - private def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = { + private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = { + val fb = serializeDoubleVector(p.features) + val bytes = new Array[Byte](1 + 8 + fb.length) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.put(LABELED_POINT_MAGIC) + bb.putDouble(p.label) + bb.put(fb) + bytes + } + + private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = { require(bytes.length >= 9, "Byte array too short") val magic = bytes(0) if (magic != LABELED_POINT_MAGIC) { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala new file mode 100644 index 000000000000..642843f90204 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.api.python + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint + +class PythonMLLibAPISuite extends FunSuite { + val py = new PythonMLLibAPI + + test("vector serialization") { + val vectors = Seq( + Vectors.dense(Array.empty[Double]), + Vectors.dense(0.0), + Vectors.dense(0.0, -2.0), + Vectors.sparse(0, Array.empty[Int], Array.empty[Double]), + Vectors.sparse(1, Array.empty[Int], Array.empty[Double]), + Vectors.sparse(2, Array(1), Array(-2.0))) + vectors.foreach { v => + val bytes = py.serializeDoubleVector(v) + val u = py.deserializeDoubleVector(bytes) + assert(u.getClass === v.getClass) + assert(u === v) + } + } + + test("labeled point serialization") { + val points = Seq( + LabeledPoint(0.0, Vectors.dense(Array.empty[Double])), + LabeledPoint(1.0, Vectors.dense(0.0)), + LabeledPoint(-0.5, Vectors.dense(0.0, -2.0)), + LabeledPoint(0.0, Vectors.sparse(0, Array.empty[Int], Array.empty[Double])), + LabeledPoint(1.0, Vectors.sparse(1, Array.empty[Int], Array.empty[Double])), + LabeledPoint(-0.5, Vectors.sparse(2, Array(1), Array(-2.0)))) + points.foreach { p => + val bytes = py.serializeLabeledPoint(p) + val q = py.deserializeLabeledPoint(bytes) + assert(q.label === p.label) + assert(q.features.getClass === p.features.getClass) + assert(q.features === p.features) + } + } +} From ce9a47575897c55511a6d0f03f4077358746107e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 14 May 2014 10:09:02 -0700 Subject: [PATCH 16/26] add deserialize_labeled_point to pyspark with tests --- python/pyspark/mllib/_common.py | 72 +++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 21 deletions(-) diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index e6f0953810ed..8e128489c567 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -22,6 +22,7 @@ from pyspark.mllib.linalg import SparseVector from pyspark.serializers import Serializer + """ Common utilities shared throughout MLlib, primarily for dealing with different data types. These include: @@ -146,7 +147,7 @@ def _serialize_sparse_vector(v): return ba -def _deserialize_double_vector(ba): +def _deserialize_double_vector(ba, offset=0): """Deserialize a double vector from a mutually understood format. >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0]) @@ -159,43 +160,46 @@ def _deserialize_double_vector(ba): if type(ba) != bytearray: raise TypeError("_deserialize_double_vector called on a %s; " "wanted bytearray" % type(ba)) - if len(ba) < 5: + nb = len(ba) - offset + if nb < 5: raise TypeError("_deserialize_double_vector called on a %d-byte array, " - "which is too short" % len(ba)) - if ba[0] == DENSE_VECTOR_MAGIC: - return _deserialize_dense_vector(ba) - elif ba[0] == SPARSE_VECTOR_MAGIC: - return _deserialize_sparse_vector(ba) + "which is too short" % nb) + if ba[offset] == DENSE_VECTOR_MAGIC: + return _deserialize_dense_vector(ba, offset) + elif ba[offset] == SPARSE_VECTOR_MAGIC: + return _deserialize_sparse_vector(ba, offset) else: raise TypeError("_deserialize_double_vector called on bytearray " "with wrong magic") -def _deserialize_dense_vector(ba): +def _deserialize_dense_vector(ba, offset=0): """Deserialize a dense vector into a numpy array.""" - if len(ba) < 5: + nb = len(ba) - offset + if nb < 5: raise TypeError("_deserialize_dense_vector called on a %d-byte array, " - "which is too short" % len(ba)) - length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0] - if len(ba) != 8 * length + 5: + "which is too short" % nb) + length = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=int32)[0] + if nb < 8 * length + 5: raise TypeError("_deserialize_dense_vector called on bytearray " "with wrong length") - return _deserialize_numpy_array([length], ba, 5) + return _deserialize_numpy_array([length], ba, offset + 5) -def _deserialize_sparse_vector(ba): +def _deserialize_sparse_vector(ba, offset=0): """Deserialize a sparse vector into a MLlib SparseVector object.""" - if len(ba) < 9: + nb = len(ba) - offset + if nb < 9: raise TypeError("_deserialize_sparse_vector called on a %d-byte array, " - "which is too short" % len(ba)) - header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32) + "which is too short" % l) + header = ndarray(shape=[2], buffer=ba, offset=offset + 1, dtype=int32) size = header[0] nonzeros = header[1] - if len(ba) != 9 + 12 * nonzeros: + if nb < 9 + 12 * nonzeros: raise TypeError("_deserialize_sparse_vector called on bytearray " "with wrong length") - indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32) - values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64) + indices = _deserialize_numpy_array([nonzeros], ba, offset + 9, dtype=int32) + values = _deserialize_numpy_array([nonzeros], ba, offset + 9 + 4 * nonzeros, dtype=float64) return SparseVector(int(size), indices, values) @@ -242,7 +246,23 @@ def _deserialize_double_matrix(ba): def _serialize_labeled_point(p): - """Serialize a LabeledPoint with a features vector of any type.""" + """ + Serialize a LabeledPoint with a features vector of any type. + + >>> from pyspark.mllib.regression import LabeledPoint + >>> dp0 = LabeledPoint(0.5, array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])) + >>> dp1 = _deserialize_labeled_point(_serialize_labeled_point(dp0)) + >>> dp1.label == dp0.label + True + >>> array_equal(dp1.features, dp0.features) + True + >>> sp0 = LabeledPoint(0.0, SparseVector(4, [1, 3], [3.0, 5.5])) + >>> sp1 = _deserialize_labeled_point(_serialize_labeled_point(sp0)) + >>> sp1.label == sp1.label + True + >>> sp1.features == sp0.features + True + """ from pyspark.mllib.regression import LabeledPoint serialized_features = _serialize_double_vector(p.features) header = bytearray(9) @@ -251,6 +271,16 @@ def _serialize_labeled_point(p): header_float[0] = p.label return header + serialized_features +def _deserialize_labeled_point(ba, offset=0): + """Deserialize a LabeledPoint from a mutually understood format.""" + from pyspark.mllib.regression import LabeledPoint + if type(ba) != bytearray: + raise TypeError("Expecting a bytearray but got %s" % type(ba)) + if ba[offset] != LABELED_POINT_MAGIC: + raise TypeError("Expecting magic number %d but got %d" % (LABELED_POINT_MAGIC, ba[0])) + label = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=float64)[0] + features = _deserialize_double_vector(ba, offset + 9) + return LabeledPoint(label, features) def _copyto(array, buffer, offset, shape, dtype): """ From a41675af2de2eb5c204cd8782803c95e627e2448 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 14 May 2014 10:59:13 -0700 Subject: [PATCH 17/26] python loadLabeledPoint uses Scala's implementation --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 10 ++++++++-- python/pyspark/mllib/util.py | 11 +++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index b0d0f37385d3..762ac14a655f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -20,14 +20,14 @@ package org.apache.spark.mllib.api.python import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD -import org.apache.spark.SparkException /** * :: DeveloperApi :: @@ -191,6 +191,12 @@ class PythonMLLibAPI extends Serializable { LabeledPoint(label, deserializeDoubleVector(bytes, 9)) } + def loadLabeledPoints( + jsc: JavaSparkContext, + path: String, + minPartitions: Int): JavaRDD[Array[Byte]] = + MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint).toJavaRDD() + private def trainRegressionModel( trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel, dataBytesJRDD: JavaRDD[Array[Byte]], diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 48a79e8e1bce..e2e59c1ef3dc 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -19,7 +19,10 @@ from pyspark.mllib.linalg import Vectors, SparseVector from pyspark.mllib.regression import LabeledPoint -from pyspark.mllib._common import _convert_vector +from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point +from pyspark.rdd import RDD +from pyspark.serializers import NoOpSerializer + class MLUtils: """ @@ -182,7 +185,11 @@ def loadLabeledPoints(sc, path, minPartitions=None): >>> print examples[1] (0.0,[1.01,2.02,3.03]) """ - return sc.textFile(path, minPartitions).map(LabeledPoint.parse) + minPartitions = minPartitions or min(sc.defaultParallelism, 2) + jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions) + serialized = RDD(jSerialized, sc, NoOpSerializer()) + return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes))) + def _test(): import doctest From f644438837db6d23c549da459ca7e86c6c3edbbd Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 14 May 2014 11:02:35 -0700 Subject: [PATCH 18/26] remove parse methods based on eval from pyspark --- python/pyspark/mllib/linalg.py | 24 ------------------------ python/pyspark/mllib/regression.py | 17 ----------------- 2 files changed, 41 deletions(-) diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 7fa002d3416e..d176b6c58cde 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -232,30 +232,6 @@ def dense(elements): """ return array(elements, dtype=float64) - - @staticmethod - def parse(s): - """ - Parses a string resulted from Vectors.stringify() into a vector. - - >>> Vectors.parse("[0.0,1.0]") - array([ 0., 1.]) - >>> print Vectors.parse("(2,[1],[1.0])") - (2,[1],[1.0]) - """ - return Vectors._parse_structured(eval(s)) - - - @staticmethod - def _parse_structured(data): - if type(data) == list: - return Vectors.dense(data) - elif type(data) == tuple: - return Vectors.sparse(data[0], data[1], data[2]) - else: - raise SyntaxError("Cannot recognize " + data) - - @staticmethod def stringify(vector): """ diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 7dcf98bfb31e..69226c570c89 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -48,23 +48,6 @@ def __str__(self): return "(" + ",".join((str(self.label), Vectors.stringify(self.features))) + ")" - @staticmethod - def parse(s): - """ - Parses a string resulted from str() to a LabeledPoint. - - >>> print LabeledPoint.parse("(1.0,[0.0,1.0])") - (1.0,[0.0,1.0]) - >>> print LabeledPoint.parse("(1.0,(2,[1],[1.0]))") - (1.0,(2,[1],[1.0])) - """ - return LabeledPoint._parse_structured(eval(s)) - - - @staticmethod - def _parse_structured(data): - return LabeledPoint(data[0], Vectors._parse_structured(data[1])) - class LinearModel(object): """A linear model that has a vector of coefficients and an intercept.""" def __init__(self, weights, intercept): From 050fca42ef54680026294dd8af4f6b33b5a61428 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 14 May 2014 12:12:30 -0700 Subject: [PATCH 19/26] use StringTokenizer --- .../spark/mllib/util/NumericParser.scala | 51 +++++-------------- 1 file changed, 14 insertions(+), 37 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala index d390498958d8..75a5af99a658 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.util +import java.util.StringTokenizer + import scala.collection.mutable.{ArrayBuffer, ListBuffer} import org.apache.spark.SparkException @@ -36,19 +38,12 @@ import NumericTokenizer._ * - tuple: a list of numbers, arrays, or tuples stored as `(...)` * * @param s input string - * @param start start index - * @param end end index */ -private[mllib] class NumericTokenizer(s: String, start: Int, end: Int) { - - /** - * Creates a tokenizer for the entire input string. - */ - def this(s: String) = this(s, 0, s.length) +private[mllib] class NumericTokenizer(s: String) { - private var cur = start private var allowComma = false private var _value = Double.NaN + private val stringTokenizer = new StringTokenizer(s, "()[],", true) /** * Returns the most recent parsed number. @@ -62,42 +57,24 @@ private[mllib] class NumericTokenizer(s: String, start: Int, end: Int) { * - [[org.apache.spark.mllib.util.NumericTokenizer#END]]. */ def next(): Int = { - if (cur < end) { - val c = s.charAt(cur) - if (c == '(' || c == '[') { + if (stringTokenizer.hasMoreTokens()) { + val token = stringTokenizer.nextToken() + if (token == "(" || token == "[") { allowComma = false - cur += 1 - c - } else if (c == ')' || c == ']') { + token.charAt(0) + } else if (token == ")" || token == "]") { allowComma = true - cur += 1 - c - } else if (c == ',') { + token.charAt(0) + } else if (token == ",") { if (allowComma) { - cur += 1 allowComma = false next() } else { - throw new SparkException(s"Found a ',' at a wrong location: $cur.") + throw new SparkException("Found a ',' at a wrong position.") } } else { // expecting a number - var inNumber = true - val beginAt = cur - while (cur < end && inNumber) { - val d = s.charAt(cur) - if (d == ')' || d == ']' || d == ',') { - inNumber = false - } else { - cur += 1 - } - } - try { - _value = java.lang.Double.parseDouble(s.substring(beginAt, cur)) - } catch { - case e: Throwable => - throw new SparkException("Error parsing a number", e) - } + _value = java.lang.Double.parseDouble(token) allowComma = true NUMBER } @@ -126,7 +103,7 @@ private[mllib] object NumericParser { } else if (token == END) { null } else { - throw new SparkException(s"Cannot recgonize token type: $token.") + throw new SparkException(s"Cannot recognize token type: $token.") } } From e86bf3825664aa5fdae5d7fe9d0a45ec4abfe21e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 14 May 2014 12:48:18 -0700 Subject: [PATCH 20/26] remove NumericTokenizer --- .../spark/mllib/util/NumericParser.scala | 139 +++++++----------- .../spark/mllib/util/NumericParserSuite.scala | 37 +---- 2 files changed, 61 insertions(+), 115 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala index 75a5af99a658..6b75e7d8b16f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -23,120 +23,89 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer} import org.apache.spark.SparkException -private[mllib] object NumericTokenizer { - val NUMBER = -1 - val END = -2 -} - -import NumericTokenizer._ - /** - * Simple tokenizer for a numeric structure consisting of three types: + * Simple parser for a numeric structure consisting of three types: * * - number: a double in Java's floating number format * - array: an array of numbers stored as `[v0,v1,...,vn]` * - tuple: a list of numbers, arrays, or tuples stored as `(...)` - * - * @param s input string */ -private[mllib] class NumericTokenizer(s: String) { - - private var allowComma = false - private var _value = Double.NaN - private val stringTokenizer = new StringTokenizer(s, "()[],", true) +private[mllib] object NumericParser { - /** - * Returns the most recent parsed number. - */ - def value: Double = _value + /** Parses a string into a Double, an Array[Double], or a Seq[Any]. */ + def parse(s: String): Any = { + val tokenizer = new StringTokenizer(s, "()[],", true) + if (tokenizer.hasMoreTokens()) { + val token = tokenizer.nextToken() + if (token == "(") { + parseTuple(tokenizer) + } else if (token == "[") { + parseArray(tokenizer) + } else { + // expecting a number + java.lang.Double.parseDouble(token) + } + } else { + throw new SparkException(s"Cannot find any token from the input string.") + } + } - /** - * Returns the next token, which could be any of the following: - * - '[', ']', '(', or ')'. - * - [[org.apache.spark.mllib.util.NumericTokenizer#NUMBER]], call value() to get its value. - * - [[org.apache.spark.mllib.util.NumericTokenizer#END]]. - */ - def next(): Int = { - if (stringTokenizer.hasMoreTokens()) { - val token = stringTokenizer.nextToken() - if (token == "(" || token == "[") { - allowComma = false - token.charAt(0) - } else if (token == ")" || token == "]") { - allowComma = true - token.charAt(0) + private def parseArray(tokenizer: StringTokenizer): Array[Double] = { + val values = ArrayBuffer.empty[Double] + var parsing = true + var allowComma = false + var token: String = null + while (parsing && tokenizer.hasMoreTokens()) { + token = tokenizer.nextToken() + if (token == "]") { + parsing = false } else if (token == ",") { if (allowComma) { allowComma = false - next() } else { throw new SparkException("Found a ',' at a wrong position.") } } else { // expecting a number - _value = java.lang.Double.parseDouble(token) + values.append(java.lang.Double.parseDouble(token)) allowComma = true - NUMBER } - } else { - END } - } -} - -/** - * Simple parser for tokens from [[org.apache.spark.mllib.util.NumericTokenizer]]. - */ -private[mllib] object NumericParser { - - /** Parses a string into a Double, an Array[Double], or a Seq[Any]. */ - def parse(s: String): Any = parse(new NumericTokenizer(s)) - - private def parse(tokenizer: NumericTokenizer): Any = { - val token = tokenizer.next() - if (token == NUMBER) { - tokenizer.value - } else if (token == '(') { - parseTuple(tokenizer) - } else if (token == '[') { - parseArray(tokenizer) - } else if (token == END) { - null - } else { - throw new SparkException(s"Cannot recognize token type: $token.") - } - } - - private def parseArray(tokenizer: NumericTokenizer): Array[Double] = { - val values = ArrayBuffer.empty[Double] - var token = tokenizer.next() - while (token == NUMBER) { - values.append(tokenizer.value) - token = tokenizer.next() - } - if (token != ']') { - throw new SparkException(s"An array must end with ] but got $token.") + if (parsing) { + throw new SparkException(s"An array must end with ']'.") } values.toArray } - private def parseTuple(tokenizer: NumericTokenizer): Seq[_] = { + private def parseTuple(tokenizer: StringTokenizer): Seq[_] = { val items = ListBuffer.empty[Any] - var token = tokenizer.next() - while (token != ')' && token != END) { - if (token == NUMBER) { - items.append(tokenizer.value) - } else if (token == '(') { + var parsing = true + var allowComma = false + var token: String = null + while (parsing && tokenizer.hasMoreTokens()) { + token = tokenizer.nextToken() + if (token == "(") { items.append(parseTuple(tokenizer)) - } else if (token == '[') { + allowComma = true + } else if (token == "[") { items.append(parseArray(tokenizer)) + allowComma = true + } else if (token == ",") { + if (allowComma) { + allowComma = false + } else { + throw new SparkException("Found a ',' at a wrong position.") + } + } else if (token == ")") { + parsing = false } else { - throw new SparkException(s"Cannot recognize token type: $token.") + // expecting a number + items.append(java.lang.Double.parseDouble(token)) + allowComma = true } - token = tokenizer.next() } - if (token != ')') { - throw new SparkException(s"A tuple must end with ) but got $token.") + if (parsing) { + throw new SparkException(s"A tuple must with ')'.") } items.toSeq } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala index 0591257c0ab1..f68fb95eac4e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/NumericParserSuite.scala @@ -17,49 +17,26 @@ package org.apache.spark.mllib.util -import scala.collection.mutable.ListBuffer - import org.scalatest.FunSuite import org.apache.spark.SparkException class NumericParserSuite extends FunSuite { - test("tokenizer") { + test("parser") { val s = "((1.0,2e3),-4,[5e-6,7.0E8],+9)" - val tokenizer = new NumericTokenizer(s) - var token = tokenizer.next() - val tokens = ListBuffer.empty[Any] - while (token != NumericTokenizer.END) { - token match { - case NumericTokenizer.NUMBER => - tokens.append(tokenizer.value) - case other => - tokens.append(token) - } - token = tokenizer.next() - } - val expected = Seq('(', '(', 1.0, 2e3, ')', -4.0, '[', 5e-6, 7e8, ']', 9.0, ')') - assert(expected === tokens) + val parsed = NumericParser.parse(s).asInstanceOf[Seq[_]] + assert(parsed(0).asInstanceOf[Seq[_]] === Seq(1.0, 2.0e3)) + assert(parsed(1).asInstanceOf[Double] === -4.0) + assert(parsed(2).asInstanceOf[Array[Double]] === Array(5.0e-6, 7.0e8)) + assert(parsed(3).asInstanceOf[Double] === 9.0) val malformatted = Seq("a", "[1,,]", "0.123.4", "1 2", "3+4") malformatted.foreach { s => intercept[SparkException] { - val tokenizer = new NumericTokenizer(s) - while (tokenizer.next() != NumericTokenizer.END) { - // do nothing - } + NumericParser.parse(s) println(s"Didn't detect malformatted string $s.") } } } - - test("parser") { - val s = "((1,2),4,[5,6],8)" - val parsed = NumericParser.parse(s).asInstanceOf[Seq[_]] - assert(parsed(0).asInstanceOf[Seq[_]] === Seq(1.0, 2.0)) - assert(parsed(1).asInstanceOf[Double] === 4.0) - assert(parsed(2).asInstanceOf[Array[Double]] === Array(5.0, 6.0)) - assert(parsed(3).asInstanceOf[Double] === 8.0) - } } From 5bcfbc4e0a8f1df23543288c2f041c31efb0aa03 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 14 May 2014 14:23:22 -0700 Subject: [PATCH 21/26] update test to add scientific notations --- python/pyspark/mllib/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index e2e59c1ef3dc..7ffa0dd8f939 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -170,7 +170,7 @@ def loadLabeledPoints(sc, path, minPartitions=None): >>> from tempfile import NamedTemporaryFile >>> from pyspark.mllib.util import MLUtils - >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \ + >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), \ LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] >>> tempFile = NamedTemporaryFile(delete=True) >>> tempFile.close() @@ -179,7 +179,7 @@ def loadLabeledPoints(sc, path, minPartitions=None): >>> type(loaded[0]) == LabeledPoint True >>> print examples[0] - (1.1,(3,[0,2],[1.23,4.56])) + (1.1,(3,[0,2],[-1.23,4.56e-07])) >>> type(examples[1]) == LabeledPoint True >>> print examples[1] From 640fe0c898248e485eea3d550255b25e76bf3993 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 14 May 2014 14:34:28 -0700 Subject: [PATCH 22/26] throw SparkException --- .../apache/spark/mllib/util/NumericParser.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala index 6b75e7d8b16f..5952e63761cb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -43,7 +43,7 @@ private[mllib] object NumericParser { parseArray(tokenizer) } else { // expecting a number - java.lang.Double.parseDouble(token) + parseDouble(token) } } else { throw new SparkException(s"Cannot find any token from the input string.") @@ -67,7 +67,7 @@ private[mllib] object NumericParser { } } else { // expecting a number - values.append(java.lang.Double.parseDouble(token)) + values.append(parseDouble(token)) allowComma = true } } @@ -100,13 +100,22 @@ private[mllib] object NumericParser { parsing = false } else { // expecting a number - items.append(java.lang.Double.parseDouble(token)) + items.append(parseDouble(token)) allowComma = true } } if (parsing) { - throw new SparkException(s"A tuple must with ')'.") + throw new SparkException(s"A tuple must end with ')'.") } items.toSeq } + + private def parseDouble(s: String): Double = { + try { + java.lang.Double.parseDouble(s) + } catch { + case e: Throwable => + throw new SparkException(s"Cannot parse a double from: $s", e) + } + } } From f06d5bab599b6d1acbe9b6f5ccdb76da1a84a6c9 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 14 May 2014 22:01:10 -0700 Subject: [PATCH 23/26] add docs and minor updates --- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 7 +++++++ .../org/apache/spark/mllib/regression/LabeledPoint.scala | 6 +++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 762ac14a655f..c44173793b39 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -191,6 +191,13 @@ class PythonMLLibAPI extends Serializable { LabeledPoint(label, deserializeDoubleVector(bytes, 9)) } + /** + * Loads and serializes labeled points saved with `RDD#saveAsTextFile`. + * @param jsc Java SparkContext + * @param path file or directory path in any Hadoop-supported file system URI + * @param minPartitions min number of partitions + * @return serialized labeled points stored in a JavaRDD of byte array + */ def loadLabeledPoints( jsc: JavaSparkContext, path: String, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 74a8f974ed7c..f83dece139e3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -39,7 +39,7 @@ object LabeledPoint { * an [[org.apache.spark.mllib.regression.LabeledPoint]]. */ def parse(s: String): LabeledPoint = { - if (s.startsWith("(") || s.startsWith(")")) { + if (s.startsWith("(")) { NumericParser.parse(s) match { case Seq(label: Double, numeric: Any) => LabeledPoint(label, Vectors.parseNumeric(numeric)) @@ -48,8 +48,8 @@ object LabeledPoint { } } else { // dense format used before v1.0 val parts = s.split(',') - val label = parts(0).toDouble - val features = Vectors.dense(parts(1).trim().split(' ').map(_.toDouble)) + val label = java.lang.Double.parseDouble(parts(0)) + val features = Vectors.dense(parts(1).trim().split(' ').map(java.lang.Double.parseDouble)) LabeledPoint(label, features) } } From 56746ea1750612dc771d3f79ee8f5ef4a2a15c10 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 2 Jun 2014 12:05:09 -0700 Subject: [PATCH 24/26] replace # by . --- .../scala/org/apache/spark/mllib/util/MLUtils.scala | 12 ++++++------ .../org/apache/spark/mllib/util/NumericParser.scala | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 123f924145e1..37dcc45c9eea 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -180,7 +180,7 @@ object MLUtils { } /** - * Loads vectors saved using `RDD[Vector]#saveAsTextFile`. + * Loads vectors saved using `RDD[Vector].saveAsTextFile`. * @param sc Spark context * @param path file or directory path in any Hadoop-supported file system URI * @param minPartitions min number of partitions @@ -190,13 +190,13 @@ object MLUtils { sc.textFile(path, minPartitions).map(Vectors.parse) /** - * Loads vectors saved using `RDD[Vector]#saveAsTextFile` with the default number of partitions. + * Loads vectors saved using `RDD[Vector].saveAsTextFile` with the default number of partitions. */ def loadVectors(sc: SparkContext, path: String): RDD[Vector] = sc.textFile(path, sc.defaultMinPartitions).map(Vectors.parse) /** - * Loads labeled points saved using `RDD[LabeledPoint]#saveAsTextFile`. + * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile`. * @param sc Spark context * @param path file or directory path in any Hadoop-supported file system URI * @param minPartitions min number of partitions @@ -206,7 +206,7 @@ object MLUtils { sc.textFile(path, minPartitions).map(LabeledPoint.parse) /** - * Loads labeled points saved using `RDD[LabeledPoint]#saveAsTextFile` with the default number of + * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with the default number of * partitions. */ def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] = @@ -225,7 +225,7 @@ object MLUtils { * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading. */ - @deprecated("Should use RDD#saveAsTextFile and MLUtils#loadLabeledPoints instead.", "1.0") + @deprecated("Should use MLUtils.loadLabeledPoints instead.", "1.0") def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = { sc.textFile(dir).map { line => val parts = line.split(',') @@ -246,7 +246,7 @@ object MLUtils { * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading. */ - @deprecated("Should use RDD#saveAsTextFile and MLUtils#loadLabeledPoints instead.", "1.0") + @deprecated("Should use RDD[LabeledPoint].saveAsTextFile instead.", "1.0") def saveLabeledData(data: RDD[LabeledPoint], dir: String) { val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" ")) dataStr.saveAsTextFile(dir) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala index 5952e63761cb..f7cba6c6cb62 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/NumericParser.scala @@ -107,7 +107,7 @@ private[mllib] object NumericParser { if (parsing) { throw new SparkException(s"A tuple must end with ')'.") } - items.toSeq + items } private def parseDouble(s: String): Double = { From 297be75117498916d028547e8f28f26385332f3b Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 3 Jun 2014 12:43:40 -0700 Subject: [PATCH 25/26] change LabeledPoint.parse to LabeledPointParser.parse to maintain binary compatibility --- .../org/apache/spark/mllib/regression/LabeledPoint.scala | 5 ++++- .../src/main/scala/org/apache/spark/mllib/util/MLUtils.scala | 4 ++-- .../apache/spark/mllib/regression/LabeledPointSuite.scala | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index f83dece139e3..62a03af4a996 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -33,7 +33,10 @@ case class LabeledPoint(label: Double, features: Vector) { } } -object LabeledPoint { +/** + * Parser for [[org.apache.spark.mllib.regression.LabeledPoint]]. + */ +private[mllib] object LabeledPointParser { /** * Parses a string resulted from `LabeledPoint#toString` into * an [[org.apache.spark.mllib.regression.LabeledPoint]]. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 37dcc45c9eea..151623951378 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PartitionwiseSampledRDD import org.apache.spark.util.random.BernoulliSampler -import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.storage.StorageLevel @@ -203,7 +203,7 @@ object MLUtils { * @return labeled points stored as an RDD[LabeledPoint] */ def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): RDD[LabeledPoint] = - sc.textFile(path, minPartitions).map(LabeledPoint.parse) + sc.textFile(path, minPartitions).map(LabeledPointParser.parse) /** * Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with the default number of diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala index 110c44a7193f..d9308aaba6ee 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala @@ -28,12 +28,12 @@ class LabeledPointSuite extends FunSuite { LabeledPoint(1.0, Vectors.dense(1.0, 0.0)), LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0)))) points.foreach { p => - assert(p === LabeledPoint.parse(p.toString)) + assert(p === LabeledPointParser.parse(p.toString)) } } test("parse labeled points with v0.9 format") { - val point = LabeledPoint.parse("1.0,1.0 0.0 -2.0") + val point = LabeledPointParser.parse("1.0,1.0 0.0 -2.0") assert(point === LabeledPoint(1.0, Vectors.dense(1.0, 0.0, -2.0))) } } From 2d1116a4076c822d828f99206195a0a709b94ca9 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 3 Jun 2014 14:41:14 -0700 Subject: [PATCH 26/26] make loadLabeledData/saveLabeledData deprecated since 1.0.1 --- .../src/main/scala/org/apache/spark/mllib/util/MLUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 151623951378..aaf92a1a8869 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -225,7 +225,7 @@ object MLUtils { * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading. */ - @deprecated("Should use MLUtils.loadLabeledPoints instead.", "1.0") + @deprecated("Should use MLUtils.loadLabeledPoints instead.", "1.0.1") def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = { sc.textFile(dir).map { line => val parts = line.split(',') @@ -246,7 +246,7 @@ object MLUtils { * @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and * [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading. */ - @deprecated("Should use RDD[LabeledPoint].saveAsTextFile instead.", "1.0") + @deprecated("Should use RDD[LabeledPoint].saveAsTextFile instead.", "1.0.1") def saveLabeledData(data: RDD[LabeledPoint], dir: String) { val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" ")) dataStr.saveAsTextFile(dir)