Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
19aa523
update toString and add parsers for Vectors and LabeledPoint
mengxr May 7, 2014
9e63a02
add loadVectors and loadLabeledPoints
mengxr May 7, 2014
e761d32
make LabelPoint.parse compatible with the dense format used before v1…
mengxr May 7, 2014
7853f88
update pyspark's SparseVector.__str__
mengxr May 7, 2014
5c2dbfa
add parse to pyspark's Vectors
mengxr May 7, 2014
a7a178e
add stringify to pyspark's Vectors
mengxr May 7, 2014
cd6c78f
add __str__ and parse to LabeledPoint
mengxr May 7, 2014
ea122b5
Merge branch 'master' into labeled-io
mengxr May 8, 2014
63dc396
add loadLabeledPoints to pyspark
mengxr May 8, 2014
d731817
style update
mengxr May 8, 2014
b0c50cb
add customized parser
mengxr May 8, 2014
c1885c1
add headers and minor changes
mengxr May 9, 2014
7aac03a
remove Scala parsers
mengxr May 9, 2014
810d6df
update tokenizer/parser implementation
mengxr May 14, 2014
aea4ae3
minor updates
mengxr May 14, 2014
e9fcd49
add serializeLabeledPoint and tests
mengxr May 14, 2014
ce9a475
add deserialize_labeled_point to pyspark with tests
mengxr May 14, 2014
a41675a
python loadLabeledPoint uses Scala's implementation
mengxr May 14, 2014
f644438
remove parse methods based on eval from pyspark
mengxr May 14, 2014
6155b75
merge master
mengxr May 14, 2014
050fca4
use StringTokenizer
mengxr May 14, 2014
e86bf38
remove NumericTokenizer
mengxr May 14, 2014
5bcfbc4
update test to add scientific notations
mengxr May 14, 2014
640fe0c
throw SparkException
mengxr May 14, 2014
f06d5ba
add docs and minor updates
mengxr May 15, 2014
623a5f0
merge master
mengxr Jun 2, 2014
56746ea
replace # by .
mengxr Jun 2, 2014
d6b1473
Merge branch 'master' into labeled-io
mengxr Jun 2, 2014
297be75
change LabeledPoint.parse to LabeledPointParser.parse to maintain bin…
mengxr Jun 3, 2014
2d1116a
make loadLabeledData/saveLabeledData deprecated since 1.0.1
mengxr Jun 3, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ object DecisionTreeRunner {
val sc = new SparkContext(conf)

// Load training data and cache it.
val examples = MLUtils.loadLabeledData(sc, params.input).cache()
val examples = MLUtils.loadLabeledPoints(sc, params.input).cache()

val splits = examples.randomSplit(Array(0.8, 0.2))
val training = splits(0).cache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ package org.apache.spark.mllib.api.python
import java.nio.{ByteBuffer, ByteOrder}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

/**
Expand All @@ -41,7 +42,7 @@ class PythonMLLibAPI extends Serializable {
private val DENSE_MATRIX_MAGIC: Byte = 3
private val LABELED_POINT_MAGIC: Byte = 4

private def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
require(bytes.length - offset >= 5, "Byte array too short")
val magic = bytes(offset)
if (magic == DENSE_VECTOR_MAGIC) {
Expand Down Expand Up @@ -116,7 +117,7 @@ class PythonMLLibAPI extends Serializable {
bytes
}

private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
case s: SparseVector =>
serializeSparseVector(s)
case _ =>
Expand Down Expand Up @@ -167,7 +168,18 @@ class PythonMLLibAPI extends Serializable {
bytes
}

private def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
val fb = serializeDoubleVector(p.features)
val bytes = new Array[Byte](1 + 8 + fb.length)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
bb.put(LABELED_POINT_MAGIC)
bb.putDouble(p.label)
bb.put(fb)
bytes
}

private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
require(bytes.length >= 9, "Byte array too short")
val magic = bytes(0)
if (magic != LABELED_POINT_MAGIC) {
Expand All @@ -179,6 +191,19 @@ class PythonMLLibAPI extends Serializable {
LabeledPoint(label, deserializeDoubleVector(bytes, 9))
}

/**
* Loads and serializes labeled points saved with `RDD#saveAsTextFile`.
* @param jsc Java SparkContext
* @param path file or directory path in any Hadoop-supported file system URI
* @param minPartitions min number of partitions
* @return serialized labeled points stored in a JavaRDD of byte array
*/
def loadLabeledPoints(
jsc: JavaSparkContext,
path: String,
minPartitions: Int): JavaRDD[Array[Byte]] =
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint).toJavaRDD()

private def trainRegressionModel(
trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
dataBytesJRDD: JavaRDD[Array[Byte]],
Expand Down
33 changes: 28 additions & 5 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.spark.mllib.linalg

import java.lang.{Iterable => JavaIterable, Integer => JavaInteger, Double => JavaDouble}
import java.lang.{Double => JavaDouble, Integer => JavaInteger, Iterable => JavaIterable}
import java.util.Arrays

import scala.annotation.varargs
import scala.collection.JavaConverters._

import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV}
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}

import org.apache.spark.mllib.util.NumericParser
import org.apache.spark.SparkException

/**
* Represents a numeric vector, whose index type is Int and value type is Double.
Expand Down Expand Up @@ -124,6 +127,25 @@ object Vectors {
}.toSeq)
}

/**
* Parses a string resulted from `Vector#toString` into
* an [[org.apache.spark.mllib.linalg.Vector]].
*/
def parse(s: String): Vector = {
parseNumeric(NumericParser.parse(s))
}

private[mllib] def parseNumeric(any: Any): Vector = {
any match {
case values: Array[Double] =>
Vectors.dense(values)
case Seq(size: Double, indices: Array[Double], values: Array[Double]) =>
Vectors.sparse(size.toInt, indices.map(_.toInt), values)
case other =>
throw new SparkException(s"Cannot parse $other.")
}
}

/**
* Creates a vector instance from a breeze vector.
*/
Expand Down Expand Up @@ -175,9 +197,10 @@ class SparseVector(
val indices: Array[Int],
val values: Array[Double]) extends Vector {

override def toString: String = {
"(" + size + "," + indices.zip(values).mkString("[", "," ,"]") + ")"
}
require(indices.length == values.length)

override def toString: String =
"(%s,%s,%s)".format(size, indices.mkString("[", ",", "]"), values.mkString("[", ",", "]"))

override def toArray: Array[Double] = {
val data = new Array[Double](size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.mllib.regression

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.util.NumericParser
import org.apache.spark.SparkException

/**
* Class that represents the features and labels of a data point.
Expand All @@ -27,6 +29,31 @@ import org.apache.spark.mllib.linalg.Vector
*/
case class LabeledPoint(label: Double, features: Vector) {
override def toString: String = {
"LabeledPoint(%s, %s)".format(label, features)
"(%s,%s)".format(label, features)
}
}

/**
* Parser for [[org.apache.spark.mllib.regression.LabeledPoint]].
*/
private[mllib] object LabeledPointParser {
/**
* Parses a string resulted from `LabeledPoint#toString` into
* an [[org.apache.spark.mllib.regression.LabeledPoint]].
*/
def parse(s: String): LabeledPoint = {
if (s.startsWith("(")) {
NumericParser.parse(s) match {
case Seq(label: Double, numeric: Any) =>
LabeledPoint(label, Vectors.parseNumeric(numeric))
case other =>
throw new SparkException(s"Cannot parse $other.")
}
} else { // dense format used before v1.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we ever expose this to users before 1.0? I don't think we need compatibility with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, we actually did; never mind.

val parts = s.split(',')
val label = java.lang.Double.parseDouble(parts(0))
val features = Vectors.dense(parts(1).trim().split(' ').map(java.lang.Double.parseDouble))
LabeledPoint(label, features)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ object LinearDataGenerator {
val sc = new SparkContext(sparkMaster, "LinearDataGenerator")
val data = generateLinearRDD(sc, nexamples, nfeatures, eps, nparts = parts)

MLUtils.saveLabeledData(data, outputPath)
data.saveAsTextFile(outputPath)

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ object LogisticRegressionDataGenerator {
val sc = new SparkContext(sparkMaster, "LogisticRegressionDataGenerator")
val data = generateLogisticRDD(sc, nexamples, nfeatures, eps, parts)

MLUtils.saveLabeledData(data, outputPath)
data.saveAsTextFile(outputPath)

sc.stop()
}
}
47 changes: 42 additions & 5 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PartitionwiseSampledRDD
import org.apache.spark.util.random.BernoulliSampler
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -180,7 +180,39 @@ object MLUtils {
}

/**
* :: Experimental ::
* Loads vectors saved using `RDD[Vector].saveAsTextFile`.
* @param sc Spark context
* @param path file or directory path in any Hadoop-supported file system URI
* @param minPartitions min number of partitions
* @return vectors stored as an RDD[Vector]
*/
def loadVectors(sc: SparkContext, path: String, minPartitions: Int): RDD[Vector] =
sc.textFile(path, minPartitions).map(Vectors.parse)

/**
* Loads vectors saved using `RDD[Vector].saveAsTextFile` with the default number of partitions.
*/
def loadVectors(sc: SparkContext, path: String): RDD[Vector] =
sc.textFile(path, sc.defaultMinPartitions).map(Vectors.parse)

/**
* Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile`.
* @param sc Spark context
* @param path file or directory path in any Hadoop-supported file system URI
* @param minPartitions min number of partitions
* @return labeled points stored as an RDD[LabeledPoint]
*/
def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): RDD[LabeledPoint] =
sc.textFile(path, minPartitions).map(LabeledPointParser.parse)

/**
* Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with the default number of
* partitions.
*/
def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] =
loadLabeledPoints(sc, dir, sc.defaultMinPartitions)

/**
* Load labeled data from a file. The data format used here is
* <L>, <f1> <f2> ...
* where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
Expand All @@ -189,8 +221,11 @@ object MLUtils {
* @param dir Directory to the input data files.
* @return An RDD of LabeledPoint. Each labeled point has two elements: the first element is
* the label, and the second element represents the feature values (an array of Double).
*
* @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and
* [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading.
*/
@Experimental
@deprecated("Should use MLUtils.loadLabeledPoints instead.", "1.0.1")
def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = {
sc.textFile(dir).map { line =>
val parts = line.split(',')
Expand All @@ -201,15 +236,17 @@ object MLUtils {
}

/**
* :: Experimental ::
* Save labeled data to a file. The data format used here is
* <L>, <f1> <f2> ...
* where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
*
* @param data An RDD of LabeledPoints containing data to be saved.
* @param dir Directory to save the data.
*
* @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and
* [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading.
*/
@Experimental
@deprecated("Should use RDD[LabeledPoint].saveAsTextFile instead.", "1.0.1")
def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" "))
dataStr.saveAsTextFile(dir)
Expand Down
Loading