Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ml.feature

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.attribute.BinaryAttribute
import org.apache.spark.ml.param._
Expand All @@ -31,10 +31,12 @@ import org.apache.spark.sql.types.{DoubleType, StructType}
* :: Experimental ::
* Binarize a column of continuous features given a threshold.
*/
@Since("1.4.0")
@Experimental
final class Binarizer(override val uid: String)
final class Binarizer @Since("1.4.0") (@Since("1.4.0") override val uid: String)
extends Transformer with HasInputCol with HasOutputCol {

@Since("1.4.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default constructor should be since 1.2.0. Please check others as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mengxr I'm a little confused here because Binarizer wasn't added until 1.4.0. I did correct the default constructor to 1.2.0 for the original classes from that version (HashingTF, StandardScaler, Tokenizer).

def this() = this(Identifiable.randomUID("binarizer"))

/**
Expand All @@ -48,19 +50,24 @@ final class Binarizer(override val uid: String)
new DoubleParam(this, "threshold", "threshold used to binarize continuous features")

/** @group getParam */
@Since("1.4.0")
def getThreshold: Double = $(threshold)

/** @group setParam */
@Since("1.4.0")
def setThreshold(value: Double): this.type = set(threshold, value)

setDefault(threshold -> 0.0)

/** @group setParam */
@Since("1.4.0")
def setInputCol(value: String): this.type = set(inputCol, value)

/** @group setParam */
@Since("1.4.0")
def setOutputCol(value: String): this.type = set(outputCol, value)

@Since("1.4.0")
override def transform(dataset: DataFrame): DataFrame = {
transformSchema(dataset.schema, logging = true)
val td = $(threshold)
Expand All @@ -71,6 +78,7 @@ final class Binarizer(override val uid: String)
binarizer(col($(inputCol))).as(outputColName, metadata))
}

@Since("1.4.0")
override def transformSchema(schema: StructType): StructType = {
SchemaUtils.checkColumnType(schema, $(inputCol), DoubleType)

Expand All @@ -85,5 +93,6 @@ final class Binarizer(override val uid: String)
StructType(outputFields)
}

@Since("1.4.1")
override def copy(extra: ParamMap): Binarizer = defaultCopy(extra)
}
13 changes: 11 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.ml.feature
import java.{util => ju}

import org.apache.spark.SparkException
import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.Model
import org.apache.spark.ml.attribute.NominalAttribute
import org.apache.spark.ml.param._
Expand All @@ -34,10 +34,12 @@ import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
* :: Experimental ::
* `Bucketizer` maps a column of continuous features to a column of feature buckets.
*/
@Since("1.4.0")
@Experimental
final class Bucketizer(override val uid: String)
final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String)
extends Model[Bucketizer] with HasInputCol with HasOutputCol {

@Since("1.4.0")
def this() = this(Identifiable.randomUID("bucketizer"))

/**
Expand All @@ -57,17 +59,22 @@ final class Bucketizer(override val uid: String)
Bucketizer.checkSplits)

/** @group getParam */
@Since("1.4.0")
def getSplits: Array[Double] = $(splits)

/** @group setParam */
@Since("1.4.0")
def setSplits(value: Array[Double]): this.type = set(splits, value)

/** @group setParam */
@Since("1.4.0")
def setInputCol(value: String): this.type = set(inputCol, value)

/** @group setParam */
@Since("1.4.0")
def setOutputCol(value: String): this.type = set(outputCol, value)

@Since("1.4.0")
override def transform(dataset: DataFrame): DataFrame = {
transformSchema(dataset.schema)
val bucketizer = udf { feature: Double =>
Expand All @@ -85,11 +92,13 @@ final class Bucketizer(override val uid: String)
attr.toStructField()
}

@Since("1.4.0")
override def transformSchema(schema: StructType): StructType = {
SchemaUtils.checkColumnType(schema, $(inputCol), DoubleType)
SchemaUtils.appendColumn(schema, prepOutputField(schema))
}

@Since("1.4.1")
override def copy(extra: ParamMap): Bucketizer = {
defaultCopy[Bucketizer](extra).setParent(parent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.ml.feature

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
Expand Down Expand Up @@ -103,29 +103,37 @@ private[feature] trait CountVectorizerParams extends Params with HasInputCol wit
* :: Experimental ::
* Extracts a vocabulary from document collections and generates a [[CountVectorizerModel]].
*/
@Since("1.5.0")
@Experimental
class CountVectorizer(override val uid: String)
class CountVectorizer @Since("1.5.0") (@Since("1.5.0") override val uid: String)
extends Estimator[CountVectorizerModel] with CountVectorizerParams {

@Since("1.5.0")
def this() = this(Identifiable.randomUID("cntVec"))

/** @group setParam */
@Since("1.5.0")
def setInputCol(value: String): this.type = set(inputCol, value)

/** @group setParam */
@Since("1.5.0")
def setOutputCol(value: String): this.type = set(outputCol, value)

/** @group setParam */
@Since("1.5.0")
def setVocabSize(value: Int): this.type = set(vocabSize, value)

/** @group setParam */
@Since("1.5.0")
def setMinDF(value: Double): this.type = set(minDF, value)

/** @group setParam */
@Since("1.5.0")
def setMinTF(value: Double): this.type = set(minTF, value)

setDefault(vocabSize -> (1 << 18), minDF -> 1)

@Since("1.5.0")
override def fit(dataset: DataFrame): CountVectorizerModel = {
transformSchema(dataset.schema, logging = true)
val vocSize = $(vocabSize)
Expand Down Expand Up @@ -164,10 +172,12 @@ class CountVectorizer(override val uid: String)
copyValues(new CountVectorizerModel(uid, vocab).setParent(this))
}

@Since("1.5.0")
override def transformSchema(schema: StructType): StructType = {
validateAndTransformSchema(schema)
}

@Since("1.5.0")
override def copy(extra: ParamMap): CountVectorizer = defaultCopy(extra)
}

Expand All @@ -176,27 +186,34 @@ class CountVectorizer(override val uid: String)
* Converts a text document to a sparse vector of token counts.
* @param vocabulary An Array over terms. Only the terms in the vocabulary will be counted.
*/
@Since("1.5.0")
@Experimental
class CountVectorizerModel(override val uid: String, val vocabulary: Array[String])
class CountVectorizerModel @Since("1.5.0") (@Since("1.5.0") override val uid:
String, @Since("1.5.0") val vocabulary: Array[String])
extends Model[CountVectorizerModel] with CountVectorizerParams {

@Since("1.5.0")
def this(vocabulary: Array[String]) = {
this(Identifiable.randomUID("cntVecModel"), vocabulary)
set(vocabSize, vocabulary.length)
}

/** @group setParam */
@Since("1.5.0")
def setInputCol(value: String): this.type = set(inputCol, value)

/** @group setParam */
@Since("1.5.0")
def setOutputCol(value: String): this.type = set(outputCol, value)

/** @group setParam */
@Since("1.5.0")
def setMinTF(value: Double): this.type = set(minTF, value)

/** Dictionary created from [[vocabulary]] and its indices, broadcast once for [[transform()]] */
private var broadcastDict: Option[Broadcast[Map[String, Int]]] = None

@Since("1.5.0")
override def transform(dataset: DataFrame): DataFrame = {
if (broadcastDict.isEmpty) {
val dict = vocabulary.zipWithIndex.toMap
Expand Down Expand Up @@ -224,10 +241,12 @@ class CountVectorizerModel(override val uid: String, val vocabulary: Array[Strin
dataset.withColumn($(outputCol), vectorizer(col($(inputCol))))
}

@Since("1.5.0")
override def transformSchema(schema: StructType): StructType = {
validateAndTransformSchema(schema)
}

@Since("1.5.0")
override def copy(extra: ParamMap): CountVectorizerModel = {
val copied = new CountVectorizerModel(uid, vocabulary).setParent(parent)
copyValues(copied, extra)
Expand Down
12 changes: 10 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/feature/DCT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ml.feature

import edu.emory.mathcs.jtransforms.dct._

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.BooleanParam
import org.apache.spark.ml.util.Identifiable
Expand All @@ -35,38 +35,46 @@ import org.apache.spark.sql.types.DataType
*
* More information on [[https://en.wikipedia.org/wiki/Discrete_cosine_transform#DCT-II Wikipedia]].
*/
@Since("1.5.0")
@Experimental
class DCT(override val uid: String)
class DCT @Since("1.5.0") (@Since("1.5.0") override val uid: String)
extends UnaryTransformer[Vector, Vector, DCT] {

@Since("1.5.0")
def this() = this(Identifiable.randomUID("dct"))

/**
* Indicates whether to perform the inverse DCT (true) or forward DCT (false).
* Default: false
* @group param
*/
@Since("1.5.0")
def inverse: BooleanParam = new BooleanParam(
this, "inverse", "Set transformer to perform inverse DCT")

/** @group setParam */
@Since("1.5.0")
def setInverse(value: Boolean): this.type = set(inverse, value)

/** @group getParam */
@Since("1.5.0")
def getInverse: Boolean = $(inverse)

setDefault(inverse -> false)

@Since("1.5.0")
override protected def createTransformFunc: Vector => Vector = { vec =>
val result = vec.toArray
val jTransformer = new DoubleDCT_1D(result.length)
if ($(inverse)) jTransformer.inverse(result, true) else jTransformer.forward(result, true)
Vectors.dense(result)
}

@Since("1.5.0")
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType.isInstanceOf[VectorUDT], s"Input type must be VectorUDT but got $inputType.")
}

@Since("1.5.0")
override protected def outputDataType: DataType = new VectorUDT
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ml.feature

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.{ParamMap, Param}
import org.apache.spark.ml.util.Identifiable
Expand All @@ -31,10 +31,12 @@ import org.apache.spark.sql.types.DataType
* provided "weight" vector. In other words, it scales each column of the dataset by a scalar
* multiplier.
*/
@Since("1.4.0")
@Experimental
class ElementwiseProduct(override val uid: String)
class ElementwiseProduct @Since("1.4.0") (@Since("1.4.0") override val uid: String)
extends UnaryTransformer[Vector, Vector, ElementwiseProduct] {

@Since("1.4.0")
def this() = this(Identifiable.randomUID("elemProd"))

/**
Expand All @@ -44,16 +46,20 @@ class ElementwiseProduct(override val uid: String)
val scalingVec: Param[Vector] = new Param(this, "scalingVec", "vector for hadamard product")

/** @group setParam */
@Since("1.4.0")
def setScalingVec(value: Vector): this.type = set(scalingVec, value)

/** @group getParam */
@Since("1.4.0")
def getScalingVec: Vector = getOrDefault(scalingVec)

@Since("1.4.0")
override protected def createTransformFunc: Vector => Vector = {
require(params.contains(scalingVec), s"transformation requires a weight vector")
val elemScaler = new feature.ElementwiseProduct($(scalingVec))
elemScaler.transform
}

@Since("1.4.0")
override protected def outputDataType: DataType = new VectorUDT()
}
14 changes: 12 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ml.feature

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators}
Expand All @@ -32,15 +32,20 @@ import org.apache.spark.sql.types.{ArrayType, StructType}
* :: Experimental ::
* Maps a sequence of terms to their term frequencies using the hashing trick.
*/
@Since("1.2.0")
@Experimental
class HashingTF(override val uid: String) extends Transformer with HasInputCol with HasOutputCol {
class HashingTF @Since("1.2.0") (@Since("1.4.0") override val uid: String) extends Transformer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor with uid should be since 1.4.0. The default constructor below is since 1.2.0. Please check others as well.

with HasInputCol with HasOutputCol {

@Since("1.2.0")
def this() = this(Identifiable.randomUID("hashingTF"))

/** @group setParam */
@Since("1.4.0")
def setInputCol(value: String): this.type = set(inputCol, value)

/** @group setParam */
@Since("1.4.0")
def setOutputCol(value: String): this.type = set(outputCol, value)

/**
Expand All @@ -54,11 +59,14 @@ class HashingTF(override val uid: String) extends Transformer with HasInputCol w
setDefault(numFeatures -> (1 << 18))

/** @group getParam */
@Since("1.2.0")
def getNumFeatures: Int = $(numFeatures)

/** @group setParam */
@Since("1.2.0")
def setNumFeatures(value: Int): this.type = set(numFeatures, value)

@Since("1.4.0")
override def transform(dataset: DataFrame): DataFrame = {
val outputSchema = transformSchema(dataset.schema)
val hashingTF = new feature.HashingTF($(numFeatures))
Expand All @@ -67,6 +75,7 @@ class HashingTF(override val uid: String) extends Transformer with HasInputCol w
dataset.select(col("*"), t(col($(inputCol))).as($(outputCol), metadata))
}

@Since("1.4.0")
override def transformSchema(schema: StructType): StructType = {
val inputType = schema($(inputCol)).dataType
require(inputType.isInstanceOf[ArrayType],
Expand All @@ -75,5 +84,6 @@ class HashingTF(override val uid: String) extends Transformer with HasInputCol w
SchemaUtils.appendColumn(schema, attrGroup.toStructField())
}

@Since("1.4.1")
override def copy(extra: ParamMap): HashingTF = defaultCopy(extra)
}
Loading