Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.mllib.feature

import scala.collection.mutable.ArrayBuilder

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.Statistics
Expand All @@ -31,8 +31,10 @@ import org.apache.spark.rdd.RDD
*
* @param selectedFeatures list of indices to select (filter). Must be ordered asc
*/
@Since("1.3.0")
@Experimental
class ChiSqSelectorModel (val selectedFeatures: Array[Int]) extends VectorTransformer {
class ChiSqSelectorModel (
@Since("1.3.0") val selectedFeatures: Array[Int]) extends VectorTransformer {

require(isSorted(selectedFeatures), "Array has to be sorted asc")

Expand All @@ -52,6 +54,7 @@ class ChiSqSelectorModel (val selectedFeatures: Array[Int]) extends VectorTransf
* @param vector vector to be transformed.
* @return transformed vector.
*/
@Since("1.3.0")
override def transform(vector: Vector): Vector = {
compress(vector, selectedFeatures)
}
Expand Down Expand Up @@ -107,8 +110,10 @@ class ChiSqSelectorModel (val selectedFeatures: Array[Int]) extends VectorTransf
* @param numTopFeatures number of features that selector will select
* (ordered by statistic value descending)
*/
@Since("1.3.0")
@Experimental
class ChiSqSelector (val numTopFeatures: Int) extends Serializable {
class ChiSqSelector (
@Since("1.3.0") val numTopFeatures: Int) extends Serializable {

/**
* Returns a ChiSquared feature selector.
Expand All @@ -117,6 +122,7 @@ class ChiSqSelector (val numTopFeatures: Int) extends Serializable {
* Real-valued features will be treated as categorical for each distinct value.
* Apply feature discretizer before using this function.
*/
@Since("1.3.0")
def fit(data: RDD[LabeledPoint]): ChiSqSelectorModel = {
val indices = Statistics.chiSqTest(data)
.zipWithIndex.sortBy { case (res, _) => -res.statistic }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.feature

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.mllib.linalg._

/**
Expand All @@ -27,6 +27,7 @@ import org.apache.spark.mllib.linalg._
* multiplier.
* @param scalingVec The values used to scale the reference vector's individual components.
*/
@Since("1.4.0")
@Experimental
class ElementwiseProduct(val scalingVec: Vector) extends VectorTransformer {

Expand All @@ -36,6 +37,7 @@ class ElementwiseProduct(val scalingVec: Vector) extends VectorTransformer {
* @param vector vector to be transformed.
* @return transformed vector.
*/
@Since("1.4.0")
override def transform(vector: Vector): Vector = {
require(vector.size == scalingVec.size,
s"vector sizes do not match: Expected ${scalingVec.size} but found ${vector.size}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.{Iterable => JavaIterable}
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
Expand All @@ -34,19 +34,25 @@ import org.apache.spark.util.Utils
*
* @param numFeatures number of features (default: 2^20^)
*/
@Since("1.1.0")
@Experimental
class HashingTF(val numFeatures: Int) extends Serializable {

/**
*/
@Since("1.1.0")
def this() = this(1 << 20)

/**
* Returns the index of the input term.
*/
@Since("1.1.0")
def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures)

/**
* Transforms the input document into a sparse term frequency vector.
*/
@Since("1.1.0")
def transform(document: Iterable[_]): Vector = {
val termFrequencies = mutable.HashMap.empty[Int, Double]
document.foreach { term =>
Expand All @@ -59,20 +65,23 @@ class HashingTF(val numFeatures: Int) extends Serializable {
/**
* Transforms the input document into a sparse term frequency vector (Java version).
*/
@Since("1.1.0")
def transform(document: JavaIterable[_]): Vector = {
transform(document.asScala)
}

/**
* Transforms the input document to term frequency vectors.
*/
@Since("1.1.0")
def transform[D <: Iterable[_]](dataset: RDD[D]): RDD[Vector] = {
dataset.map(this.transform)
}

/**
* Transforms the input document to term frequency vectors (Java version).
*/
@Since("1.1.0")
def transform[D <: JavaIterable[_]](dataset: JavaRDD[D]): JavaRDD[Vector] = {
dataset.rdd.map(this.transform).toJavaRDD()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.mllib.feature

import breeze.linalg.{DenseVector => BDV}

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.rdd.RDD
Expand All @@ -37,6 +37,7 @@ import org.apache.spark.rdd.RDD
* @param minDocFreq minimum of documents in which a term
* should appear for filtering
*/
@Since("1.1.0")
@Experimental
class IDF(val minDocFreq: Int) {

Expand All @@ -48,6 +49,7 @@ class IDF(val minDocFreq: Int) {
* Computes the inverse document frequency.
* @param dataset an RDD of term frequency vectors
*/
@Since("1.1.0")
def fit(dataset: RDD[Vector]): IDFModel = {
val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator(
minDocFreq = minDocFreq))(
Expand All @@ -61,6 +63,7 @@ class IDF(val minDocFreq: Int) {
* Computes the inverse document frequency.
* @param dataset a JavaRDD of term frequency vectors
*/
@Since("1.1.0")
def fit(dataset: JavaRDD[Vector]): IDFModel = {
fit(dataset.rdd)
}
Expand Down Expand Up @@ -171,6 +174,7 @@ class IDFModel private[spark] (val idf: Vector) extends Serializable {
* @param dataset an RDD of term frequency vectors
* @return an RDD of TF-IDF vectors
*/
@Since("1.1.0")
def transform(dataset: RDD[Vector]): RDD[Vector] = {
val bcIdf = dataset.context.broadcast(idf)
dataset.mapPartitions(iter => iter.map(v => IDFModel.transform(bcIdf.value, v)))
Expand All @@ -182,13 +186,15 @@ class IDFModel private[spark] (val idf: Vector) extends Serializable {
* @param v a term frequency vector
* @return a TF-IDF vector
*/
@Since("1.3.0")
def transform(v: Vector): Vector = IDFModel.transform(idf, v)

/**
* Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
* @param dataset a JavaRDD of term frequency vectors
* @return a JavaRDD of TF-IDF vectors
*/
@Since("1.1.0")
def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = {
transform(dataset.rdd).toJavaRDD()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.feature

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}

/**
Expand All @@ -31,9 +31,11 @@ import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors
*
* @param p Normalization in L^p^ space, p = 2 by default.
*/
@Since("1.1.0")
@Experimental
class Normalizer(p: Double) extends VectorTransformer {

@Since("1.1.0")
def this() = this(2)

require(p >= 1.0)
Expand All @@ -44,6 +46,7 @@ class Normalizer(p: Double) extends VectorTransformer {
* @param vector vector to be normalized.
* @return normalized vector. If the norm of the input is zero, it will return the input vector.
*/
@Since("1.1.0")
override def transform(vector: Vector): Vector = {
val norm = Vectors.norm(vector, p)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.mllib.feature

import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed.RowMatrix
Expand All @@ -27,6 +28,7 @@ import org.apache.spark.rdd.RDD
*
* @param k number of principal components
*/
@Since("1.4.0")
class PCA(val k: Int) {
require(k >= 1, s"PCA requires a number of principal components k >= 1 but was given $k")

Expand All @@ -35,6 +37,7 @@ class PCA(val k: Int) {
*
* @param sources source vectors
*/
@Since("1.4.0")
def fit(sources: RDD[Vector]): PCAModel = {
require(k <= sources.first().size,
s"source vector size is ${sources.first().size} must be greater than k=$k")
Expand All @@ -58,7 +61,10 @@ class PCA(val k: Int) {
new PCAModel(k, pc)
}

/** Java-friendly version of [[fit()]] */
/**
* Java-friendly version of [[fit()]]
*/
@Since("1.4.0")
def fit(sources: JavaRDD[Vector]): PCAModel = fit(sources.rdd)
}

Expand All @@ -76,6 +82,7 @@ class PCAModel private[spark] (val k: Int, val pc: DenseMatrix) extends VectorTr
* Vector must be the same length as the source vectors given to [[PCA.fit()]].
* @return transformed vector. Vector will be of length k.
*/
@Since("1.4.0")
override def transform(vector: Vector): Vector = {
vector match {
case dv: DenseVector =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.mllib.feature

import org.apache.spark.Logging
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.rdd.RDD
Expand All @@ -32,9 +32,11 @@ import org.apache.spark.rdd.RDD
* dense output, so this does not work on sparse input and will raise an exception.
* @param withStd True by default. Scales the data to unit standard deviation.
*/
@Since("1.1.0")
@Experimental
class StandardScaler(withMean: Boolean, withStd: Boolean) extends Logging {

@Since("1.1.0")
def this() = this(false, true)

if (!(withMean || withStd)) {
Expand All @@ -47,6 +49,7 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends Logging {
* @param data The data used to compute the mean and variance to build the transformation model.
* @return a StandardScalarModel
*/
@Since("1.1.0")
def fit(data: RDD[Vector]): StandardScalerModel = {
// TODO: skip computation if both withMean and withStd are false
val summary = data.treeAggregate(new MultivariateOnlineSummarizer)(
Expand All @@ -69,13 +72,17 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends Logging {
* @param withStd whether to scale the data to have unit standard deviation
* @param withMean whether to center the data before scaling
*/
@Since("1.1.0")
@Experimental
class StandardScalerModel (
val std: Vector,
val mean: Vector,
var withStd: Boolean,
var withMean: Boolean) extends VectorTransformer {

/**
*/
@Since("1.3.0")
def this(std: Vector, mean: Vector) {
this(std, mean, withStd = std != null, withMean = mean != null)
require(this.withStd || this.withMean,
Expand All @@ -86,15 +93,18 @@ class StandardScalerModel (
}
}

@Since("1.3.0")
def this(std: Vector) = this(std, null)

@Since("1.3.0")
@DeveloperApi
def setWithMean(withMean: Boolean): this.type = {
require(!(withMean && this.mean == null), "cannot set withMean to true while mean is null")
this.withMean = withMean
this
}

@Since("1.3.0")
@DeveloperApi
def setWithStd(withStd: Boolean): this.type = {
require(!(withStd && this.std == null),
Expand All @@ -115,6 +125,7 @@ class StandardScalerModel (
* @return Standardized vector. If the std of a column is zero, it will return default `0.0`
* for the column with zero std.
*/
@Since("1.1.0")
override def transform(vector: Vector): Vector = {
require(mean.size == vector.size)
if (withMean) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.feature

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
Expand All @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
* :: DeveloperApi ::
* Trait for transformation of a vector
*/
@Since("1.1.0")
@DeveloperApi
trait VectorTransformer extends Serializable {

Expand All @@ -35,6 +36,7 @@ trait VectorTransformer extends Serializable {
* @param vector vector to be transformed.
* @return transformed vector.
*/
@Since("1.1.0")
def transform(vector: Vector): Vector

/**
Expand All @@ -43,6 +45,7 @@ trait VectorTransformer extends Serializable {
* @param data RDD[Vector] to be transformed.
* @return transformed RDD[Vector].
*/
@Since("1.1.0")
def transform(data: RDD[Vector]): RDD[Vector] = {
// Later in #1498 , all RDD objects are sent via broadcasting instead of akka.
// So it should be no longer necessary to explicitly broadcast `this` object.
Expand All @@ -55,6 +58,7 @@ trait VectorTransformer extends Serializable {
* @param data JavaRDD[Vector] to be transformed.
* @return transformed JavaRDD[Vector].
*/
@Since("1.1.0")
def transform(data: JavaRDD[Vector]): JavaRDD[Vector] = {
transform(data.rdd)
}
Expand Down
Loading