Skip to content

Commit d254be7

Browse files
committed
Merge branch 'master' into SPARK-6263
2 parents 62a9c7e + d70a076 commit d254be7

File tree

26 files changed

+295
-53
lines changed

26 files changed

+295
-53
lines changed

R/pkg/R/RDD.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
6767
})
6868

6969
setMethod("show", "RDD",
70-
function(.Object) {
71-
cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep=""))
70+
function(object) {
71+
cat(paste(callJMethod(getJRDD(object), "toString"), "\n", sep=""))
7272
})
7373

7474
setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {

docs/mllib-dimensionality-reduction.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ statistical method to find a rotation such that the first coordinate has the lar
137137
possible, and each succeeding coordinate in turn has the largest variance possible. The columns of
138138
the rotation matrix are called principal components. PCA is used widely in dimensionality reduction.
139139

140-
MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format.
140+
MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format and any Vectors.
141141

142142
<div class="codetabs">
143143
<div data-lang="scala" markdown="1">
@@ -157,6 +157,23 @@ val pc: Matrix = mat.computePrincipalComponents(10) // Principal components are
157157
// Project the rows to the linear space spanned by the top 10 principal components.
158158
val projected: RowMatrix = mat.multiply(pc)
159159
{% endhighlight %}
160+
161+
The following code demonstrates how to compute principal components on source vectors
162+
and use them to project the vectors into a low-dimensional space while keeping associated labels:
163+
164+
{% highlight scala %}
165+
import org.apache.spark.mllib.regression.LabeledPoint
166+
import org.apache.spark.mllib.feature.PCA
167+
168+
val data: RDD[LabeledPoint] = ...
169+
170+
// Compute the top 10 principal components.
171+
val pca = new PCA(10).fit(data.map(_.features))
172+
173+
// Project vectors to the linear space spanned by the top 10 principal components, keeping the label
174+
val projected = data.map(p => p.copy(features = pca.transform(p.features)))
175+
{% endhighlight %}
176+
160177
</div>
161178

162179
<div data-lang="java" markdown="1">

docs/mllib-feature-extraction.md

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,6 @@ v_N
507507

508508
This example below demonstrates how to load a simple vectors file, extract a set of vectors, then transform those vectors using a transforming vector value.
509509

510-
511510
<div class="codetabs">
512511
<div data-lang="scala">
513512
{% highlight scala %}
@@ -531,3 +530,57 @@ val transformedData2 = parsedData.map(x => transformer.transform(x))
531530
</div>
532531

533532

533+
## PCA
534+
535+
A feature transformer that projects vectors to a low-dimensional space using PCA.
536+
Details you can read at [dimensionality reduction](mllib-dimensionality-reduction.html).
537+
538+
### Example
539+
540+
The following code demonstrates how to compute principal components on a `Vector`
541+
and use them to project the vectors into a low-dimensional space while keeping associated labels
542+
for calculation a [Linear Regression]((mllib-linear-methods.html))
543+
544+
<div class="codetabs">
545+
<div data-lang="scala">
546+
{% highlight scala %}
547+
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
548+
import org.apache.spark.mllib.regression.LabeledPoint
549+
import org.apache.spark.mllib.linalg.Vectors
550+
import org.apache.spark.mllib.feature.PCA
551+
552+
val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line =>
553+
val parts = line.split(',')
554+
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
555+
}.cache()
556+
557+
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
558+
val training = splits(0).cache()
559+
val test = splits(1)
560+
561+
val pca = new PCA(training.first().features.size/2).fit(data.map(_.features))
562+
val training_pca = training.map(p => p.copy(features = pca.transform(p.features)))
563+
val test_pca = test.map(p => p.copy(features = pca.transform(p.features)))
564+
565+
val numIterations = 100
566+
val model = LinearRegressionWithSGD.train(training, numIterations)
567+
val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations)
568+
569+
val valuesAndPreds = test.map { point =>
570+
val score = model.predict(point.features)
571+
(score, point.label)
572+
}
573+
574+
val valuesAndPreds_pca = test_pca.map { point =>
575+
val score = model_pca.predict(point.features)
576+
(score, point.label)
577+
}
578+
579+
val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
580+
val MSE_pca = valuesAndPreds_pca.map{case(v, p) => math.pow((v - p), 2)}.mean()
581+
582+
println("Mean Squared Error = " + MSE)
583+
println("PCA Mean Squared Error = " + MSE_pca)
584+
{% endhighlight %}
585+
</div>
586+
</div>
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.feature
19+
20+
import org.apache.spark.api.java.JavaRDD
21+
import org.apache.spark.mllib.linalg._
22+
import org.apache.spark.mllib.linalg.distributed.RowMatrix
23+
import org.apache.spark.rdd.RDD
24+
25+
/**
26+
* A feature transformer that projects vectors to a low-dimensional space using PCA.
27+
*
28+
* @param k number of principal components
29+
*/
30+
class PCA(val k: Int) {
31+
require(k >= 1, s"PCA requires a number of principal components k >= 1 but was given $k")
32+
33+
/**
34+
* Computes a [[PCAModel]] that contains the principal components of the input vectors.
35+
*
36+
* @param sources source vectors
37+
*/
38+
def fit(sources: RDD[Vector]): PCAModel = {
39+
require(k <= sources.first().size,
40+
s"source vector size is ${sources.first().size} must be greater than k=$k")
41+
42+
val mat = new RowMatrix(sources)
43+
val pc = mat.computePrincipalComponents(k) match {
44+
case dm: DenseMatrix =>
45+
dm
46+
case sm: SparseMatrix =>
47+
/* Convert a sparse matrix to dense.
48+
*
49+
* RowMatrix.computePrincipalComponents always returns a dense matrix.
50+
* The following code is a safeguard.
51+
*/
52+
sm.toDense
53+
case m =>
54+
throw new IllegalArgumentException("Unsupported matrix format. Expected " +
55+
s"SparseMatrix or DenseMatrix. Instead got: ${m.getClass}")
56+
57+
}
58+
new PCAModel(k, pc)
59+
}
60+
61+
/** Java-friendly version of [[fit()]] */
62+
def fit(sources: JavaRDD[Vector]): PCAModel = fit(sources.rdd)
63+
}
64+
65+
/**
66+
* Model fitted by [[PCA]] that can project vectors to a low-dimensional space using PCA.
67+
*
68+
* @param k number of principal components.
69+
* @param pc a principal components Matrix. Each column is one principal component.
70+
*/
71+
class PCAModel private[mllib] (val k: Int, val pc: DenseMatrix) extends VectorTransformer {
72+
/**
73+
* Transform a vector by computed Principal Components.
74+
*
75+
* @param vector vector to be transformed.
76+
* Vector must be the same length as the source vectors given to [[PCA.fit()]].
77+
* @return transformed vector. Vector will be of length k.
78+
*/
79+
override def transform(vector: Vector): Vector = {
80+
vector match {
81+
case dv: DenseVector =>
82+
pc.transpose.multiply(dv)
83+
case SparseVector(size, indices, values) =>
84+
/* SparseVector -> single row SparseMatrix */
85+
val sm = Matrices.sparse(size, 1, Array(0, indices.length), indices, values).transpose
86+
val projection = sm.multiply(pc)
87+
Vectors.dense(projection.values)
88+
case _ =>
89+
throw new IllegalArgumentException("Unsupported vector format. Expected " +
90+
s"SparseVector or DenseVector. Instead got: ${vector.getClass}")
91+
}
92+
}
93+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.feature
19+
20+
import org.scalatest.FunSuite
21+
22+
import org.apache.spark.mllib.linalg.Vectors
23+
import org.apache.spark.mllib.linalg.distributed.RowMatrix
24+
import org.apache.spark.mllib.util.MLlibTestSparkContext
25+
26+
class PCASuite extends FunSuite with MLlibTestSparkContext {
27+
28+
private val data = Array(
29+
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
30+
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
31+
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
32+
)
33+
34+
private lazy val dataRDD = sc.parallelize(data, 2)
35+
36+
test("Correct computing use a PCA wrapper") {
37+
val k = dataRDD.count().toInt
38+
val pca = new PCA(k).fit(dataRDD)
39+
40+
val mat = new RowMatrix(dataRDD)
41+
val pc = mat.computePrincipalComponents(k)
42+
43+
val pca_transform = pca.transform(dataRDD).collect()
44+
val mat_multiply = mat.multiply(pc).rows.collect()
45+
46+
assert(pca_transform.toSet === mat_multiply.toSet)
47+
}
48+
}

python/pyspark/ml/param/_shared_params_code_gen.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,12 @@ def get$Name(self):
8888
print("\n# DO NOT MODIFY THIS FILE! It was generated by _shared_params_code_gen.py.\n")
8989
print("from pyspark.ml.param import Param, Params\n\n")
9090
shared = [
91-
("maxIter", "max number of iterations", None),
92-
("regParam", "regularization constant", None),
91+
("maxIter", "max number of iterations (>= 0)", None),
92+
("regParam", "regularization parameter (>= 0)", None),
9393
("featuresCol", "features column name", "'features'"),
9494
("labelCol", "label column name", "'label'"),
9595
("predictionCol", "prediction column name", "'prediction'"),
96-
("rawPredictionCol", "raw prediction column name", "'rawPrediction'"),
96+
("rawPredictionCol", "raw prediction (a.k.a. confidence) column name", "'rawPrediction'"),
9797
("inputCol", "input column name", None),
9898
("inputCols", "input column names", None),
9999
("outputCol", "output column name", None),

python/pyspark/ml/param/shared.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@
2222

2323
class HasMaxIter(Params):
2424
"""
25-
Mixin for param maxIter: max number of iterations.
25+
Mixin for param maxIter: max number of iterations (>= 0).
2626
"""
2727

2828
# a placeholder to make it appear in the generated doc
29-
maxIter = Param(Params._dummy(), "maxIter", "max number of iterations")
29+
maxIter = Param(Params._dummy(), "maxIter", "max number of iterations (>= 0)")
3030

3131
def __init__(self):
3232
super(HasMaxIter, self).__init__()
33-
#: param for max number of iterations
34-
self.maxIter = Param(self, "maxIter", "max number of iterations")
33+
#: param for max number of iterations (>= 0)
34+
self.maxIter = Param(self, "maxIter", "max number of iterations (>= 0)")
3535
if None is not None:
3636
self._setDefault(maxIter=None)
3737

@@ -51,16 +51,16 @@ def getMaxIter(self):
5151

5252
class HasRegParam(Params):
5353
"""
54-
Mixin for param regParam: regularization constant.
54+
Mixin for param regParam: regularization parameter (>= 0).
5555
"""
5656

5757
# a placeholder to make it appear in the generated doc
58-
regParam = Param(Params._dummy(), "regParam", "regularization constant")
58+
regParam = Param(Params._dummy(), "regParam", "regularization parameter (>= 0)")
5959

6060
def __init__(self):
6161
super(HasRegParam, self).__init__()
62-
#: param for regularization constant
63-
self.regParam = Param(self, "regParam", "regularization constant")
62+
#: param for regularization parameter (>= 0)
63+
self.regParam = Param(self, "regParam", "regularization parameter (>= 0)")
6464
if None is not None:
6565
self._setDefault(regParam=None)
6666

@@ -167,16 +167,16 @@ def getPredictionCol(self):
167167

168168
class HasRawPredictionCol(Params):
169169
"""
170-
Mixin for param rawPredictionCol: raw prediction column name.
170+
Mixin for param rawPredictionCol: raw prediction (a.k.a. confidence) column name.
171171
"""
172172

173173
# a placeholder to make it appear in the generated doc
174-
rawPredictionCol = Param(Params._dummy(), "rawPredictionCol", "raw prediction column name")
174+
rawPredictionCol = Param(Params._dummy(), "rawPredictionCol", "raw prediction (a.k.a. confidence) column name")
175175

176176
def __init__(self):
177177
super(HasRawPredictionCol, self).__init__()
178-
#: param for raw prediction column name
179-
self.rawPredictionCol = Param(self, "rawPredictionCol", "raw prediction column name")
178+
#: param for raw prediction (a.k.a. confidence) column name
179+
self.rawPredictionCol = Param(self, "rawPredictionCol", "raw prediction (a.k.a. confidence) column name")
180180
if 'rawPrediction' is not None:
181181
self._setDefault(rawPredictionCol='rawPrediction')
182182

@@ -403,14 +403,12 @@ class HasStepSize(Params):
403403
"""
404404

405405
# a placeholder to make it appear in the generated doc
406-
stepSize = Param(Params._dummy(), "stepSize",
407-
"Step size to be used for each iteration of optimization.")
406+
stepSize = Param(Params._dummy(), "stepSize", "Step size to be used for each iteration of optimization.")
408407

409408
def __init__(self):
410409
super(HasStepSize, self).__init__()
411410
#: param for Step size to be used for each iteration of optimization.
412-
self.stepSize = Param(self, "stepSize",
413-
"Step size to be used for each iteration of optimization.")
411+
self.stepSize = Param(self, "stepSize", "Step size to be used for each iteration of optimization.")
414412
if None is not None:
415413
self._setDefault(stepSize=None)
416414

python/pyspark/ml/pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def transform(self, dataset, params={}):
179179
return dataset
180180

181181

182-
class Evaluator(object):
182+
class Evaluator(Params):
183183
"""
184184
Base class for evaluators that compute metrics from predictions.
185185
"""

python/pyspark/ml/tests.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from pyspark.sql import DataFrame
3535
from pyspark.ml.param import Param
3636
from pyspark.ml.param.shared import HasMaxIter, HasInputCol
37-
from pyspark.ml.pipeline import Transformer, Estimator, Pipeline
37+
from pyspark.ml.pipeline import Estimator, Model, Pipeline, Transformer
3838

3939

4040
class MockDataset(DataFrame):
@@ -77,7 +77,7 @@ def fit(self, dataset, params={}):
7777
return model
7878

7979

80-
class MockModel(MockTransformer, Transformer):
80+
class MockModel(MockTransformer, Model):
8181

8282
def __init__(self):
8383
super(MockModel, self).__init__()
@@ -128,7 +128,7 @@ def test_param(self):
128128
testParams = TestParams()
129129
maxIter = testParams.maxIter
130130
self.assertEqual(maxIter.name, "maxIter")
131-
self.assertEqual(maxIter.doc, "max number of iterations")
131+
self.assertEqual(maxIter.doc, "max number of iterations (>= 0)")
132132
self.assertTrue(maxIter.parent is testParams)
133133

134134
def test_params(self):
@@ -156,7 +156,7 @@ def test_params(self):
156156
self.assertEquals(
157157
testParams.explainParams(),
158158
"\n".join(["inputCol: input column name (undefined)",
159-
"maxIter: max number of iterations (default: 10, current: 100)"]))
159+
"maxIter: max number of iterations (>= 0) (default: 10, current: 100)"]))
160160

161161

162162
if __name__ == "__main__":

0 commit comments

Comments
 (0)