Skip to content

Commit 189df16

Browse files
mengxrmateiz
authored andcommitted
[SPARK-1752][MLLIB] Standardize text format for vectors and labeled points
We should standardize the text format used to represent vectors and labeled points. The proposed formats are the following: 1. dense vector: `[v0,v1,..]` 2. sparse vector: `(size,[i0,i1],[v0,v1])` 3. labeled point: `(label,vector)` where "(..)" indicates a tuple and "[...]" indicate an array. `loadLabeledPoints` is added to pyspark's `MLUtils`. I didn't add `loadVectors` to pyspark because `RDD.saveAsTextFile` cannot stringify dense vectors in the proposed format automatically. `MLUtils#saveLabeledData` and `MLUtils#loadLabeledData` are deprecated. Users should use `RDD#saveAsTextFile` and `MLUtils#loadLabeledPoints` instead. In Scala, `MLUtils#loadLabeledPoints` is compatible with the format used by `MLUtils#loadLabeledData`. CC: @mateiz, @srowen Author: Xiangrui Meng <[email protected]> Closes apache#685 from mengxr/labeled-io and squashes the following commits: 2d1116a [Xiangrui Meng] make loadLabeledData/saveLabeledData deprecated since 1.0.1 297be75 [Xiangrui Meng] change LabeledPoint.parse to LabeledPointParser.parse to maintain binary compatibility d6b1473 [Xiangrui Meng] Merge branch 'master' into labeled-io 56746ea [Xiangrui Meng] replace # by . 623a5f0 [Xiangrui Meng] merge master f06d5ba [Xiangrui Meng] add docs and minor updates 640fe0c [Xiangrui Meng] throw SparkException 5bcfbc4 [Xiangrui Meng] update test to add scientific notations e86bf38 [Xiangrui Meng] remove NumericTokenizer 050fca4 [Xiangrui Meng] use StringTokenizer 6155b75 [Xiangrui Meng] merge master f644438 [Xiangrui Meng] remove parse methods based on eval from pyspark a41675a [Xiangrui Meng] python loadLabeledPoint uses Scala's implementation ce9a475 [Xiangrui Meng] add deserialize_labeled_point to pyspark with tests e9fcd49 [Xiangrui Meng] add serializeLabeledPoint and tests aea4ae3 [Xiangrui Meng] minor updates 810d6df [Xiangrui Meng] update tokenizer/parser implementation 7aac03a [Xiangrui Meng] remove Scala parsers c1885c1 [Xiangrui Meng] add headers and minor changes b0c50cb [Xiangrui Meng] add customized parser d731817 [Xiangrui Meng] style update 63dc396 [Xiangrui Meng] add loadLabeledPoints to pyspark ea122b5 [Xiangrui Meng] Merge branch 'master' into labeled-io cd6c78f [Xiangrui Meng] add __str__ and parse to LabeledPoint a7a178e [Xiangrui Meng] add stringify to pyspark's Vectors 5c2dbfa [Xiangrui Meng] add parse to pyspark's Vectors 7853f88 [Xiangrui Meng] update pyspark's SparseVector.__str__ e761d32 [Xiangrui Meng] make LabelPoint.parse compatible with the dense format used before v1.0 and deprecate loadLabeledData and saveLabeledData 9e63a02 [Xiangrui Meng] add loadVectors and loadLabeledPoints 19aa523 [Xiangrui Meng] update toString and add parsers for Vectors and LabeledPoint
1 parent d341b17 commit 189df16

File tree

18 files changed

+579
-72
lines changed

18 files changed

+579
-72
lines changed

examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ object DecisionTreeRunner {
9999
val sc = new SparkContext(conf)
100100

101101
// Load training data and cache it.
102-
val examples = MLUtils.loadLabeledData(sc, params.input).cache()
102+
val examples = MLUtils.loadLabeledPoints(sc, params.input).cache()
103103

104104
val splits = examples.randomSplit(Array(0.8, 0.2))
105105
val training = splits(0).cache()

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ package org.apache.spark.mllib.api.python
2020
import java.nio.{ByteBuffer, ByteOrder}
2121

2222
import org.apache.spark.annotation.DeveloperApi
23-
import org.apache.spark.api.java.JavaRDD
23+
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
2424
import org.apache.spark.mllib.classification._
2525
import org.apache.spark.mllib.clustering._
2626
import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
2727
import org.apache.spark.mllib.recommendation._
2828
import org.apache.spark.mllib.regression._
29+
import org.apache.spark.mllib.util.MLUtils
2930
import org.apache.spark.rdd.RDD
3031

3132
/**
@@ -41,7 +42,7 @@ class PythonMLLibAPI extends Serializable {
4142
private val DENSE_MATRIX_MAGIC: Byte = 3
4243
private val LABELED_POINT_MAGIC: Byte = 4
4344

44-
private def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
45+
private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
4546
require(bytes.length - offset >= 5, "Byte array too short")
4647
val magic = bytes(offset)
4748
if (magic == DENSE_VECTOR_MAGIC) {
@@ -116,7 +117,7 @@ class PythonMLLibAPI extends Serializable {
116117
bytes
117118
}
118119

119-
private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
120+
private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
120121
case s: SparseVector =>
121122
serializeSparseVector(s)
122123
case _ =>
@@ -167,7 +168,18 @@ class PythonMLLibAPI extends Serializable {
167168
bytes
168169
}
169170

170-
private def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
171+
private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
172+
val fb = serializeDoubleVector(p.features)
173+
val bytes = new Array[Byte](1 + 8 + fb.length)
174+
val bb = ByteBuffer.wrap(bytes)
175+
bb.order(ByteOrder.nativeOrder())
176+
bb.put(LABELED_POINT_MAGIC)
177+
bb.putDouble(p.label)
178+
bb.put(fb)
179+
bytes
180+
}
181+
182+
private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
171183
require(bytes.length >= 9, "Byte array too short")
172184
val magic = bytes(0)
173185
if (magic != LABELED_POINT_MAGIC) {
@@ -179,6 +191,19 @@ class PythonMLLibAPI extends Serializable {
179191
LabeledPoint(label, deserializeDoubleVector(bytes, 9))
180192
}
181193

194+
/**
195+
* Loads and serializes labeled points saved with `RDD#saveAsTextFile`.
196+
* @param jsc Java SparkContext
197+
* @param path file or directory path in any Hadoop-supported file system URI
198+
* @param minPartitions min number of partitions
199+
* @return serialized labeled points stored in a JavaRDD of byte array
200+
*/
201+
def loadLabeledPoints(
202+
jsc: JavaSparkContext,
203+
path: String,
204+
minPartitions: Int): JavaRDD[Array[Byte]] =
205+
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint).toJavaRDD()
206+
182207
private def trainRegressionModel(
183208
trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
184209
dataBytesJRDD: JavaRDD[Array[Byte]],

mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
package org.apache.spark.mllib.linalg
1919

20-
import java.lang.{Iterable => JavaIterable, Integer => JavaInteger, Double => JavaDouble}
20+
import java.lang.{Double => JavaDouble, Integer => JavaInteger, Iterable => JavaIterable}
2121
import java.util.Arrays
2222

2323
import scala.annotation.varargs
2424
import scala.collection.JavaConverters._
2525

26-
import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV}
26+
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
27+
28+
import org.apache.spark.mllib.util.NumericParser
29+
import org.apache.spark.SparkException
2730

2831
/**
2932
* Represents a numeric vector, whose index type is Int and value type is Double.
@@ -124,6 +127,25 @@ object Vectors {
124127
}.toSeq)
125128
}
126129

130+
/**
131+
* Parses a string resulted from `Vector#toString` into
132+
* an [[org.apache.spark.mllib.linalg.Vector]].
133+
*/
134+
def parse(s: String): Vector = {
135+
parseNumeric(NumericParser.parse(s))
136+
}
137+
138+
private[mllib] def parseNumeric(any: Any): Vector = {
139+
any match {
140+
case values: Array[Double] =>
141+
Vectors.dense(values)
142+
case Seq(size: Double, indices: Array[Double], values: Array[Double]) =>
143+
Vectors.sparse(size.toInt, indices.map(_.toInt), values)
144+
case other =>
145+
throw new SparkException(s"Cannot parse $other.")
146+
}
147+
}
148+
127149
/**
128150
* Creates a vector instance from a breeze vector.
129151
*/
@@ -175,9 +197,10 @@ class SparseVector(
175197
val indices: Array[Int],
176198
val values: Array[Double]) extends Vector {
177199

178-
override def toString: String = {
179-
"(" + size + "," + indices.zip(values).mkString("[", "," ,"]") + ")"
180-
}
200+
require(indices.length == values.length)
201+
202+
override def toString: String =
203+
"(%s,%s,%s)".format(size, indices.mkString("[", ",", "]"), values.mkString("[", ",", "]"))
181204

182205
override def toArray: Array[Double] = {
183206
val data = new Array[Double](size)

mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark.mllib.regression
1919

20-
import org.apache.spark.mllib.linalg.Vector
20+
import org.apache.spark.mllib.linalg.{Vectors, Vector}
21+
import org.apache.spark.mllib.util.NumericParser
22+
import org.apache.spark.SparkException
2123

2224
/**
2325
* Class that represents the features and labels of a data point.
@@ -27,6 +29,31 @@ import org.apache.spark.mllib.linalg.Vector
2729
*/
2830
case class LabeledPoint(label: Double, features: Vector) {
2931
override def toString: String = {
30-
"LabeledPoint(%s, %s)".format(label, features)
32+
"(%s,%s)".format(label, features)
33+
}
34+
}
35+
36+
/**
37+
* Parser for [[org.apache.spark.mllib.regression.LabeledPoint]].
38+
*/
39+
private[mllib] object LabeledPointParser {
40+
/**
41+
* Parses a string resulted from `LabeledPoint#toString` into
42+
* an [[org.apache.spark.mllib.regression.LabeledPoint]].
43+
*/
44+
def parse(s: String): LabeledPoint = {
45+
if (s.startsWith("(")) {
46+
NumericParser.parse(s) match {
47+
case Seq(label: Double, numeric: Any) =>
48+
LabeledPoint(label, Vectors.parseNumeric(numeric))
49+
case other =>
50+
throw new SparkException(s"Cannot parse $other.")
51+
}
52+
} else { // dense format used before v1.0
53+
val parts = s.split(',')
54+
val label = java.lang.Double.parseDouble(parts(0))
55+
val features = Vectors.dense(parts(1).trim().split(' ').map(java.lang.Double.parseDouble))
56+
LabeledPoint(label, features)
57+
}
3158
}
3259
}

mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ object LinearDataGenerator {
129129
val sc = new SparkContext(sparkMaster, "LinearDataGenerator")
130130
val data = generateLinearRDD(sc, nexamples, nfeatures, eps, nparts = parts)
131131

132-
MLUtils.saveLabeledData(data, outputPath)
132+
data.saveAsTextFile(outputPath)
133+
133134
sc.stop()
134135
}
135136
}

mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ object LogisticRegressionDataGenerator {
7979
val sc = new SparkContext(sparkMaster, "LogisticRegressionDataGenerator")
8080
val data = generateLogisticRDD(sc, nexamples, nfeatures, eps, parts)
8181

82-
MLUtils.saveLabeledData(data, outputPath)
82+
data.saveAsTextFile(outputPath)
83+
8384
sc.stop()
8485
}
8586
}

mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
2727
import org.apache.spark.rdd.RDD
2828
import org.apache.spark.rdd.PartitionwiseSampledRDD
2929
import org.apache.spark.util.random.BernoulliSampler
30-
import org.apache.spark.mllib.regression.LabeledPoint
30+
import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint}
3131
import org.apache.spark.mllib.linalg.{Vector, Vectors}
3232
import org.apache.spark.storage.StorageLevel
3333

@@ -180,7 +180,39 @@ object MLUtils {
180180
}
181181

182182
/**
183-
* :: Experimental ::
183+
* Loads vectors saved using `RDD[Vector].saveAsTextFile`.
184+
* @param sc Spark context
185+
* @param path file or directory path in any Hadoop-supported file system URI
186+
* @param minPartitions min number of partitions
187+
* @return vectors stored as an RDD[Vector]
188+
*/
189+
def loadVectors(sc: SparkContext, path: String, minPartitions: Int): RDD[Vector] =
190+
sc.textFile(path, minPartitions).map(Vectors.parse)
191+
192+
/**
193+
* Loads vectors saved using `RDD[Vector].saveAsTextFile` with the default number of partitions.
194+
*/
195+
def loadVectors(sc: SparkContext, path: String): RDD[Vector] =
196+
sc.textFile(path, sc.defaultMinPartitions).map(Vectors.parse)
197+
198+
/**
199+
* Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile`.
200+
* @param sc Spark context
201+
* @param path file or directory path in any Hadoop-supported file system URI
202+
* @param minPartitions min number of partitions
203+
* @return labeled points stored as an RDD[LabeledPoint]
204+
*/
205+
def loadLabeledPoints(sc: SparkContext, path: String, minPartitions: Int): RDD[LabeledPoint] =
206+
sc.textFile(path, minPartitions).map(LabeledPointParser.parse)
207+
208+
/**
209+
* Loads labeled points saved using `RDD[LabeledPoint].saveAsTextFile` with the default number of
210+
* partitions.
211+
*/
212+
def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] =
213+
loadLabeledPoints(sc, dir, sc.defaultMinPartitions)
214+
215+
/**
184216
* Load labeled data from a file. The data format used here is
185217
* <L>, <f1> <f2> ...
186218
* where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
@@ -189,8 +221,11 @@ object MLUtils {
189221
* @param dir Directory to the input data files.
190222
* @return An RDD of LabeledPoint. Each labeled point has two elements: the first element is
191223
* the label, and the second element represents the feature values (an array of Double).
224+
*
225+
* @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and
226+
* [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading.
192227
*/
193-
@Experimental
228+
@deprecated("Should use MLUtils.loadLabeledPoints instead.", "1.0.1")
194229
def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = {
195230
sc.textFile(dir).map { line =>
196231
val parts = line.split(',')
@@ -201,15 +236,17 @@ object MLUtils {
201236
}
202237

203238
/**
204-
* :: Experimental ::
205239
* Save labeled data to a file. The data format used here is
206240
* <L>, <f1> <f2> ...
207241
* where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
208242
*
209243
* @param data An RDD of LabeledPoints containing data to be saved.
210244
* @param dir Directory to save the data.
245+
*
246+
* @deprecated Should use [[org.apache.spark.rdd.RDD#saveAsTextFile]] for saving and
247+
* [[org.apache.spark.mllib.util.MLUtils#loadLabeledPoints]] for loading.
211248
*/
212-
@Experimental
249+
@deprecated("Should use RDD[LabeledPoint].saveAsTextFile instead.", "1.0.1")
213250
def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
214251
val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" "))
215252
dataStr.saveAsTextFile(dir)

0 commit comments

Comments
 (0)