diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md index 4fe470a8de81..1d5286f70632 100644 --- a/docs/mllib-feature-extraction.md +++ b/docs/mllib-feature-extraction.md @@ -617,3 +617,43 @@ println("PCA Mean Squared Error = " + MSE_pca) {% endhighlight %} + +## Significant Selector +Idea of this transformation it safe reduce big vector that was produced by Hashing TF for example +for reduce requirement of memory for manipulation on them. + +This transformation create a model that keep only indices that has different values on fit stage. + +### Example +
+
+{% highlight scala %} +import org.apache.spark.SparkContext +import org.apache.spark.mllib.feature.{HashingTF, SignificantSelector} +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD + +val hashingTF = new HashingTF +val localDocs: Seq[(Double, Array[String])] = Seq( + (1d, "a a b b b c d".split(" ")), + (0d, "a b c d a b c".split(" ")), + (1d, "c b a c b a a".split(" "))) + +val docs = sc.parallelize(localDocs, 2) + +val tf: RDD[LabeledPoint] = docs.map { case (label, words) => LabeledPoint(label, hashingTF.transform(words))} +// scala> tf.first().features.size +// res4: Int = 1048576 + +val transformer = new SignificantSelector().fit(tf.map(_.features)) + +val transformed_tf = tf.map(p => p.copy(features = transformer.transform(p.features))) +// scala> transformed_tf.first().features.size +// res5: Int = 4 + +// now you have smallest vector that has same features, +// but request less memory for manipulation on DecisionTree for example +{% endhighlight %} +
+
+ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/SignificantSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/SignificantSelector.scala new file mode 100644 index 000000000000..6758b41acccd --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/SignificantSelector.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import scala.collection.mutable + +import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} +import org.apache.spark.rdd.RDD + +/** + * :: Experimental :: + * Model to extract significant indices from vector. + * + * Significant indices is vector's index that has different value for different vectors. + * + * For example, when you use HashingTF they create big sparse vector, + * and this code convert to smallest vector that don't include same values indices for all vectors. + * + * @param indices array of significant indices. + */ +@Experimental +class SignificantSelectorModel(val indices: Array[Int]) extends VectorTransformer { + + /** + * Applies transformation on a vector. + * + * @param vector vector to be transformed. + * @return transformed vector. + */ + override def transform(vector: Vector): Vector = vector match { + case DenseVector(vs) => + Vectors.dense(indices.map(vs)) + + case SparseVector(s, ids, vs) => + var sv_idx = 0 + var new_idx = 0 + val elements = new mutable.ListBuffer[(Int, Double)]() + + for (idx <- indices) { + while (sv_idx < ids.length && ids(sv_idx) < idx) { + sv_idx += 1 + } + if (sv_idx < ids.length && ids(sv_idx) == idx) { + elements += ((new_idx, vs(sv_idx))) + sv_idx += 1 + } + new_idx += 1 + } + + Vectors.sparse(indices.length, elements) + + case v => + throw new IllegalArgumentException("Don't support vector type " + v.getClass) + } +} + +/** + * :: Experimental :: + * Specialized model for equivalent vectors + */ +@Experimental +class SignificantSelectorEmptyModel extends SignificantSelectorModel(Array[Int]()) { + + val empty_vector = Vectors.dense(Array[Double]()) + + override def transform(vector: Vector): Vector = empty_vector +} + +/** + * :: Experimental :: + * Create Significant selector. + */ +@Experimental +class SignificantSelector() { + + /** + * Returns a significant vector indices selector. + * + * @param sources an `RDD[Vector]` containing the vectors. + */ + def fit(sources: RDD[Vector]): SignificantSelectorModel = { + val sources_count = sources.count() + val significant_indices = sources.flatMap { + case DenseVector(vs) => + vs.zipWithIndex + case SparseVector(_, ids, vs) => + vs.zip(ids) + case v => + throw new IllegalArgumentException("Don't support vector type " + v.getClass) + } + .map(e => (e.swap, 1)) + .reduceByKey(_ + _) + .map { case ((idx, value), count) => (idx, (value, count))} + .groupByKey() + .mapValues { e => + val values = e.groupBy(_._1) + val sum = e.map(_._2).sum + + values.size + (if (sum == sources_count || values.contains(0.0)) 0 else 1) + } + .filter(_._2 > 1) + .keys + .collect() + .sorted + + if (significant_indices.nonEmpty) + new SignificantSelectorModel(significant_indices) + else + new SignificantSelectorEmptyModel() + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala new file mode 100644 index 000000000000..69ae7d9c33eb --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/SignificantSelectorTest.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.scalatest.FunSuite + +class SignificantSelectorTest extends FunSuite with MLlibTestSparkContext { + val dv = Vectors.dense(1, 2, 3, 4, 5) + val sv1 = Vectors.sparse(5, Seq((0, 1.0), (1, 2.0), (2, 3.0), (3, 4.0), (4, 5.0))) + val sv2 = Vectors.sparse(5, Seq((2, 3.0))) + + test("same result vector") { + val vectors = sc.parallelize(List( + Vectors.dense(0.0, 1.0, 2.0, 3.0, 4.0), + Vectors.dense(4.0, 5.0, 6.0, 7.0, 8.0) + )) + + val significant = new SignificantSelector().fit(vectors) + assert(significant.transform(dv) == dv) + assert(significant.transform(sv1) == sv1) + assert(significant.transform(sv2) == sv2) + } + + + test("shortest result vector") { + val vectors = sc.parallelize(List( + Vectors.dense(0.0, 2.0, 3.0, 4.0), + Vectors.dense(0.0, 2.0, 3.0, 4.0), + Vectors.dense(0.0, 2.0, 3.0, 4.0), + Vectors.sparse(4, Seq((1, 3.0), (2, 4.0))), + Vectors.dense(0.0, 3.0, 5.0, 4.0), + Vectors.dense(0.0, 3.0, 7.0, 4.0) + )) + + val significant = new SignificantSelector().fit(vectors) + + val significanted_dv = Vectors.dense(2.0, 3.0, 4.0) + val significanted_sv1 = Vectors.sparse(3, Seq((0, 2.0), (1, 3.0), (2, 4.0))) + val significanted_sv2 = Vectors.sparse(3, Seq((1, 3.0))) + + assert(significant.transform(dv) == significanted_dv) + assert(significant.transform(sv1) == significanted_sv1) + assert(significant.transform(sv2) == significanted_sv2) + } + + test("empty result vector") { + val vectors = sc.parallelize(List( + Vectors.dense(0.0, 2.0, 3.0, 4.0), + Vectors.dense(0.0, 2.0, 3.0, 4.0) + )) + + val significant = new SignificantSelector().fit(vectors) + + val empty_vector = Vectors.dense(Array[Double]()) + + assert(significant.transform(dv) == empty_vector) + assert(significant.transform(sv1) == empty_vector) + assert(significant.transform(sv2) == empty_vector) + } +} \ No newline at end of file