From 5e7c874d42e140864548406e2a7433338f58798e Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 23 Oct 2014 10:35:42 -0700 Subject: [PATCH 01/13] working on DatasetIndexer --- .../spark/mllib/feature/DatasetIndexer.scala | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala new file mode 100644 index 0000000000000..50f369d5a03e2 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.collection.OpenHashSet + +/** + * :: Experimental :: + * Class for indexing columns in a dataset. + * + * This helps process a dataset of unknown vectors into a dataset with some continuous features + * and some categorical features. The choice between continuous and categorical is based upon + * a maxCategories parameter. + * + * This can also map categorical feature values to 0-based indices. + * + * Usage: + * val myData1: RDD[Vector] = ... + * val myData2: RDD[Vector] = ... + * val datasetIndexer = new DatasetIndexer(maxCategories) + * datasetIndexer.fit(myData1) + * val indexedData1: RDD[Vector] = datasetIndexer.transform(myData1) + * datasetIndexer.fit(myData2) + * val indexedData2: RDD[Vector] = datasetIndexer.transform(myData2) + * val categoricalFeaturesInfo: Map[Int, Int] = datasetIndexer.getCategoricalFeaturesInfo() + */ +@Experimental +class DatasetIndexer( + val maxCategories: Int, + val ignoreUnrecognizedCategories: Boolean = true) + extends Logging { + + /** + * Array (over features) of sets of distinct feature values (up to maxCategories values). + * Null values in array indicate feature has been determined to be continuous. + */ + private var featureValueCounts: Array[OpenHashSet[Double]] = null + + /** + * Scans a dataset once and updates statistics about each column. + * The statistics are used to choose categorical features and re-index them. + * + * Warning: Calling this on a new dataset changes the feature statistics and thus + * can change the behavior of [[transform]] and [[getCategoricalFeatureIndexes]]. + * It is best to [[fit]] on all datasets before calling [[transform]] on any. + * + * @param data Dataset with equal-length vectors. + * NOTE: A single instance of [[DatasetIndexer]] must always be given vectors of + * the same length. If given non-matching vectors, this method will throw an error. + */ + def fit(data: RDD[Vector]): Unit = { + if (featureValueCounts == null) { + val sample = data.take(1) + if (sample.size == 0) { + logWarning("DatasetIndexer given empty RDD") + return + } + val numFeatures = sample(0).size + featureValueCounts = new Array[OpenHashSet[Double]](numFeatures) + var i = 0 + while (i < numFeatures) { + featureValueCounts(i) = new OpenHashSet[Double]() + i += 1 + } + } + val partitionIndexes: RDD[(Array[OpenHashSet[Double]], Int)] = data.mapPartitions { iter => + val partfeatureValueCounts: Array[OpenHashSet[Double]] = featureValueCounts + /* + iter.foreach { v => v match + case dv: DenseVector => + case sv: SparseVector => + } + */ + val violation: Int = 0 + Iterator((partfeatureValueCounts, violation)) + } + /* + } else { + require(featureValueCounts.length == numFeatures, + s"DatasetIndexer given non-matching RDDs. Current RDD of vectors have length $numFeatures, but was previously ") + */ + } + + /** + * + * @param data Dataset with equal-length vectors. + * NOTE: A single instance of [[DatasetIndexer]] must always be given vectors of + * the same length. If given non-matching vectors, this method will throw an error. + * @return Dataset with categorical features modified to have 0-based indices, + * using the index returned by + */ + def transform(data: RDD[Vector]): RDD[Vector] = data.map(this.transform) + + /** + * Based on datasets given to [[fit]], compute an index of the + */ + def getCategoricalFeatureIndexes: Map[Int, Map[Double, Int]] = ??? + + private def fit(datum: Vector): Unit = { + + } + + private def transform(datum: Vector): Vector = { + } +} + +/** + * :: Experimental :: + */ +@Experimental +object DatasetIndexer { + + def + +} From f409987f2f858cd5cbe04d8f9e96453ffb45ab70 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 27 Oct 2014 11:08:16 -0700 Subject: [PATCH 02/13] partly done with DatasetIndexerSuite --- .../spark/mllib/feature/DatasetIndexer.scala | 209 ++++++++++++++---- .../mllib/feature/DatasetIndexerSuite.scala | 141 ++++++++++++ 2 files changed, 308 insertions(+), 42 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala index 50f369d5a03e2..6403004cdbb43 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala @@ -19,10 +19,59 @@ package org.apache.spark.mllib.feature import org.apache.spark.Logging import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} +import org.apache.spark.mllib.linalg.{Vectors, DenseVector, SparseVector, Vector} import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.OpenHashSet +class FeatureValueStats(val numFeatures: Int, val maxCategories: Int) + extends Serializable { + + val featureValueSets = new Array[OpenHashSet[Double]](numFeatures) + + /** + * Merge other [[FeatureValueStats]] into this instance, modifying this instance. + * @param other Other instance. Not modified. + * @return This instance + */ + def merge(other: FeatureValueStats): FeatureValueStats = { + featureValueSets.zip(other.featureValueSets).foreach { case (fvs1, fvs2) => + fvs2.iterator.foreach { val2 => + if (fvs1.size <= maxCategories) fvs1.add(val2) + } + } + this + } + + def addDenseVector(dv: DenseVector): Unit = { + var i = 0 + while (i < dv.size) { + if (featureValueSets(i).size <= maxCategories) { + featureValueSets(i).add(dv(i)) + } + i += 1 + } + } + + def addSparseVector(sv: SparseVector): Unit = { + // TODO: This could be made more efficient. + var vecIndex = 0 // index into vector + var nzIndex = 0 // index into non-zero elements + while (vecIndex < sv.size) { + val featureValue = if (nzIndex < sv.indices.size && vecIndex == sv.indices(nzIndex)) { + nzIndex += 1 + sv.values(nzIndex - 1) + } else { + 0.0 + } + if (featureValueSets(vecIndex).size <= maxCategories) { + featureValueSets(vecIndex).add(featureValue) + } + vecIndex += 1 + } + } + +} + /** * :: Experimental :: * Class for indexing columns in a dataset. @@ -49,11 +98,15 @@ class DatasetIndexer( val ignoreUnrecognizedCategories: Boolean = true) extends Logging { + /** * Array (over features) of sets of distinct feature values (up to maxCategories values). * Null values in array indicate feature has been determined to be continuous. + * + * Once the number of elements in a feature's set reaches maxCategories + 1, + * then it is declared continuous, and we stop adding elements. */ - private var featureValueCounts: Array[OpenHashSet[Double]] = null + private var featureValueStats: Option[FeatureValueStats] = None /** * Scans a dataset once and updates statistics about each column. @@ -68,36 +121,93 @@ class DatasetIndexer( * the same length. If given non-matching vectors, this method will throw an error. */ def fit(data: RDD[Vector]): Unit = { - if (featureValueCounts == null) { - val sample = data.take(1) - if (sample.size == 0) { - logWarning("DatasetIndexer given empty RDD") - return - } - val numFeatures = sample(0).size - featureValueCounts = new Array[OpenHashSet[Double]](numFeatures) - var i = 0 - while (i < numFeatures) { - featureValueCounts(i) = new OpenHashSet[Double]() - i += 1 + // For each partition, get (featureValueStats, newNumFeatures). + // If all vectors have the same length, then newNumFeatures will be set to numFeatures. + // If a vector with a new length is found, then newNumFeatures is set to that length. + val partitionFeatureValueSets: RDD[(Option[FeatureValueStats], Int)] = data.mapPartitions { iter => + // Make local copy of featureValueStats. + // This will be None initially if this is the first dataset to be fitted. + var localFeatureValueStats: Option[FeatureValueStats] = featureValueStats + var newNumFeatures: Int = localFeatureValueStats match { + case Some(fvs) => fvs.numFeatures + case None => -1 } - } - val partitionIndexes: RDD[(Array[OpenHashSet[Double]], Int)] = data.mapPartitions { iter => - val partfeatureValueCounts: Array[OpenHashSet[Double]] = featureValueCounts /* - iter.foreach { v => v match + // TODO: Track which features are known to be continuous already, and do not bother + // updating counts for them. Probably store featureValueCounts in a linked list. + iter.foreach { _ match { case dv: DenseVector => + localFeatureValueStats match { + case Some(fvs) => + if (fvs.numFeatures == dv.size) { + fvs.addDenseVector(dv) + } else { + // non-matching vector lengths + newNumFeatures = dv.size + } + case None => + newNumFeatures = dv.size + localFeatureValueStats = Some(new FeatureValueStats(dv.size, maxCategories)) + localFeatureValueStats.get.addDenseVector(dv) + } case sv: SparseVector => - } + localFeatureValueStats match { + case Some(fvs) => + if (fvs.numFeatures == sv.size) { + fvs.addSparseVector(sv) + } else { + // non-matching vector lengths + newNumFeatures = sv.size + } + case None => + newNumFeatures = sv.size + localFeatureValueStats = Some(new FeatureValueStats(sv.size, maxCategories)) + localFeatureValueStats.get.addSparseVector(sv) + } + }} */ - val violation: Int = 0 - Iterator((partfeatureValueCounts, violation)) + Iterator((localFeatureValueStats, newNumFeatures)) } + val (aggFeatureValueStats: Option[FeatureValueStats], newNumFeatures: Int) = (None, -1) /* - } else { - require(featureValueCounts.length == numFeatures, - s"DatasetIndexer given non-matching RDDs. Current RDD of vectors have length $numFeatures, but was previously ") + partitionFeatureValueSets.fold((None, -1)) { + case ((Some(fvs1), newNumFeatures1), (Some(fvs2), newNumFeatures2)) => + if (fvs2.numFeatures == fvs1.numFeatures) { + val mergedFVS = fvs1.merge(fvs2) + (newNumFeatures1, newNumFeatures2) match { + case (-1, -1) => + // good: vector lengths match + (Some(mergedFVS), -1) + case (-1, _) => + (Some(mergedFVS), newNumFeatures2) + case (_, _) => + (Some(mergedFVS), newNumFeatures1) + } + } else { + // non-matching vector lengths + (Some(fvs1), fvs2.numFeatures) + } + case ((Some(fvs1), newNumFeatures1), (None, -1)) => + (Some(fvs1), newNumFeatures1) + case ((None, -1), (Some(fvs2), newNumFeatures2)) => + (Some(fvs2), newNumFeatures2) + case ((None, -1), (None, -1)) => + (None, -1) + } */ + if (newNumFeatures != -1) { + throw new RuntimeException("DatasetIndexer given records of non-matching length." + + s" Found records with length ${aggFeatureValueStats.get.numFeatures} and length" + + s" $newNumFeatures") + } + (featureValueStats, aggFeatureValueStats) match { + case (Some(origFVS), Some(newFVS)) => + origFVS.merge(newFVS) + case (None, Some(newFVS)) => + featureValueStats = Some(newFVS) + case _ => + logDebug("DatasetIndexer.fit(rdd) called on RDD with 0 rows.") + } } /** @@ -108,27 +218,42 @@ class DatasetIndexer( * @return Dataset with categorical features modified to have 0-based indices, * using the index returned by */ - def transform(data: RDD[Vector]): RDD[Vector] = data.map(this.transform) + def transform(data: RDD[Vector]): RDD[Vector] = { + val catFeatIdx = getCategoricalFeatureIndexes + data.map { v: Vector => v match { + case dv: DenseVector => + catFeatIdx.foreach { case (featureIndex, categoryMap) => + dv.values(featureIndex) = categoryMap(dv(featureIndex)) + } + dv.asInstanceOf[Vector] + case sv: SparseVector => + // TODO: This currently converts to a dense vector. After updating + // getCategoricalFeatureIndexes, make this maintain sparsity when possible. + val dv = sv.toArray + catFeatIdx.foreach { case (featureIndex, categoryMap) => + dv(featureIndex) = categoryMap(dv(featureIndex)) + } + Vectors.dense(dv) + }} + } /** - * Based on datasets given to [[fit]], compute an index of the + * Based on datasets given to [[fit]], decide which features are categorical, + * and choose indices for categories. + * @return Feature index. Keys are categorical feature indices (column indices). + * Values are mappings from original features values to 0-based category indices. */ - def getCategoricalFeatureIndexes: Map[Int, Map[Double, Int]] = ??? - - private def fit(datum: Vector): Unit = { - + def getCategoricalFeatureIndexes: Map[Int, Map[Double, Int]] = featureValueStats match { + // TODO: It would be ideal to have value 0 set to index 0 to maintain sparsity if possible. + case Some(fvs) => + fvs.featureValueSets.zipWithIndex + .filter(_._1.size <= maxCategories).map { case (featureValues, featureIndex) => + val categoryMap: Map[Double, Int] = featureValues.iterator.toList.sorted.zipWithIndex.toMap + (featureIndex, categoryMap) + }.toMap + case None => + throw new RuntimeException("DatasetIndexer.getCategoricalFeatureIndexes called," + + " but no datasets have been indexed via fit() yet.") } - private def transform(datum: Vector): Vector = { - } -} - -/** - * :: Experimental :: - */ -@Experimental -object DatasetIndexer { - - def - } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala new file mode 100644 index 0000000000000..d7d74f773fad5 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} +import org.apache.spark.mllib.random.RandomRDDs +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.util.TestingUtils._ +import org.scalatest.FunSuite + +class DatasetIndexerSuite extends FunSuite with LocalSparkContext { + + test("Can fit an empty RDD") { + val rdd = sc.parallelize(Array.empty[Vector]) + val datasetIndexer = new DatasetIndexer(maxCategories = 10) + datasetIndexer.fit(rdd) + } + + test("If not fitted, throws error when transforming RDD or getting feature indexes") { + val points = Seq(Array(1.0, 2.0), Array(0.0, 1.0)) + val rdd = sc.parallelize(points.map(Vectors.dense)) + val datasetIndexer = new DatasetIndexer(maxCategories = 10) + intercept[RuntimeException] { + datasetIndexer.transform(rdd) + println("Did not throw error when transforming before fitting.") + } + intercept[RuntimeException] { + datasetIndexer.getCategoricalFeatureIndexes + println("Did not throw error when getting feature indexes before fitting.") + } + } + + test("Throws error when given RDDs with different size vectors") { + val points1 = Seq( + Array(1.0, 2.0), + Array(0.0, 1.0, 2.0), + Array(-1.0, 3.0)) + val rdd1 = sc.parallelize(points1.map(Vectors.dense)) + val points2a = Seq( + Array(1.0, 2.0), + Array(-1.0, 3.0)) + val rdd2a = sc.parallelize(points2a.map(Vectors.dense)) + val points2b = Seq( + Array(1.0), + Array(-1.0)) + val rdd2b = sc.parallelize(points2b.map(Vectors.dense)) + val rdd3 = sc.parallelize(Array.empty[Vector]) + + val datasetIndexer1 = new DatasetIndexer(maxCategories = 10) + intercept[RuntimeException] { + datasetIndexer1.fit(rdd1) + println("Did not throw error when fitting vectors of different lengths in same RDD.") + } + val datasetIndexer2 = new DatasetIndexer(maxCategories = 10) + datasetIndexer2.fit(rdd2a) + intercept[RuntimeException] { + datasetIndexer2.fit(rdd2b) + println("Did not throw error when fitting vectors of different lengths in two RDDs.") + } + val datasetIndexer3 = new DatasetIndexer(maxCategories = 10) + datasetIndexer3.fit(rdd3) // does nothing + datasetIndexer3.fit(rdd2a) // should work + } + + test("Same result with dense and sparse vectors") { + + def testDenseSparse(densePoints: Seq[Vector], sparsePoints: Seq[Vector]): Unit = { + assert(densePoints.zip(sparsePoints).forall { case (dv, sv) => dv.toArray == sv.toArray }, + "typo in unit test") + val denseRDD = sc.parallelize(densePoints) + val sparseRDD = sc.parallelize(sparsePoints) + + val denseDatasetIndexer = new DatasetIndexer(maxCategories = 2) + val sparseDatasetIndexer = new DatasetIndexer(maxCategories = 2) + denseDatasetIndexer.fit(denseRDD) + sparseDatasetIndexer.fit(sparseRDD) + val denseFeatureIndexes = denseDatasetIndexer.getCategoricalFeatureIndexes + val sparseFeatureIndexes = sparseDatasetIndexer.getCategoricalFeatureIndexes + val categoricalFeatures = denseFeatureIndexes.keys.toSet + assert(categoricalFeatures == sparseFeatureIndexes.keys.toSet, + "Categorical features chosen from dense vs. sparse vectors did not match.") + + assert(denseFeatureIndexes == sparseFeatureIndexes, + "Categorical feature value indexes chosen from dense vs. sparse vectors did not match.") + } + + val densePoints1 = Seq( + Array(1.0, 2.0, 0.0), + Array(0.0, 1.0, 2.0), + Array(0.0, 0.0, -1.0), + Array(1.0, 3.0, 2.0)).map(Vectors.dense) + val sparsePoints1 = Seq( + Vectors.sparse(3, Array(0, 1), Array(1.0, 2.0)), + Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)), + Vectors.sparse(3, Array(2), Array(-1.0)), + Vectors.sparse(3, Array(0, 1, 2), Array(1.0, 3.0, 2.0))) + testDenseSparse(densePoints1, sparsePoints1) + + val densePoints2 = Seq( + Array(1.0, 1.0, 0.0), + Array(0.0, 1.0, 0.0), + Array(-1.0, 1.0, 0.0)).map(Vectors.dense) + val sparsePoints2 = Seq( + Vectors.sparse(3, Array(0, 1), Array(1.0, 1.0)), + Vectors.sparse(3, Array(1), Array(1.0)), + Vectors.sparse(3, Array(0, 1), Array(-1.0, 1.0))) + testDenseSparse(densePoints2, sparsePoints2) + } + + test("Chooses between categorical and continuous features correctly") { + val points1 = Seq( + Array(1.0, 2.0, 0.0), + Array(0.0, 1.0, 2.0), + Array(0.0, 0.0, -1.0), + Array(1.0, 3.0, 2.0)).map(Vectors.dense) + val rdd1 = sc.parallelize(points1) + val datasetIndexer1 = new DatasetIndexer(maxCategories = 2) + datasetIndexer1.fit(rdd1) + val featureIndex1 = datasetIndexer1.getCategoricalFeatureIndexes + } + + test("Indexes categorical features correctly") { + } + +} From 20069237f5c430b916c2b1ddbf3c8c72d56bed4d Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 27 Oct 2014 16:08:14 -0700 Subject: [PATCH 03/13] DatasetIndexer now passes tests --- .../spark/mllib/feature/DatasetIndexer.scala | 148 ++++++++---------- .../mllib/feature/DatasetIndexerSuite.scala | 4 +- 2 files changed, 71 insertions(+), 81 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala index 6403004cdbb43..f08b7e246df6b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala @@ -23,55 +23,6 @@ import org.apache.spark.mllib.linalg.{Vectors, DenseVector, SparseVector, Vector import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.OpenHashSet -class FeatureValueStats(val numFeatures: Int, val maxCategories: Int) - extends Serializable { - - val featureValueSets = new Array[OpenHashSet[Double]](numFeatures) - - /** - * Merge other [[FeatureValueStats]] into this instance, modifying this instance. - * @param other Other instance. Not modified. - * @return This instance - */ - def merge(other: FeatureValueStats): FeatureValueStats = { - featureValueSets.zip(other.featureValueSets).foreach { case (fvs1, fvs2) => - fvs2.iterator.foreach { val2 => - if (fvs1.size <= maxCategories) fvs1.add(val2) - } - } - this - } - - def addDenseVector(dv: DenseVector): Unit = { - var i = 0 - while (i < dv.size) { - if (featureValueSets(i).size <= maxCategories) { - featureValueSets(i).add(dv(i)) - } - i += 1 - } - } - - def addSparseVector(sv: SparseVector): Unit = { - // TODO: This could be made more efficient. - var vecIndex = 0 // index into vector - var nzIndex = 0 // index into non-zero elements - while (vecIndex < sv.size) { - val featureValue = if (nzIndex < sv.indices.size && vecIndex == sv.indices(nzIndex)) { - nzIndex += 1 - sv.values(nzIndex - 1) - } else { - 0.0 - } - if (featureValueSets(vecIndex).size <= maxCategories) { - featureValueSets(vecIndex).add(featureValue) - } - vecIndex += 1 - } - } - -} - /** * :: Experimental :: * Class for indexing columns in a dataset. @@ -96,8 +47,56 @@ class FeatureValueStats(val numFeatures: Int, val maxCategories: Int) class DatasetIndexer( val maxCategories: Int, val ignoreUnrecognizedCategories: Boolean = true) - extends Logging { + extends Logging with Serializable { + private class FeatureValueStats(val numFeatures: Int, val maxCategories: Int) + extends Serializable { + + val featureValueSets = Array.fill[OpenHashSet[Double]](numFeatures)(new OpenHashSet[Double]()) + + /** + * Merge other [[FeatureValueStats]] into this instance, modifying this instance. + * @param other Other instance. Not modified. + * @return This instance + */ + def merge(other: FeatureValueStats): FeatureValueStats = { + featureValueSets.zip(other.featureValueSets).foreach { case (fvs1, fvs2) => + fvs2.iterator.foreach { val2 => + if (fvs1.size <= maxCategories) fvs1.add(val2) + } + } + this + } + + def addDenseVector(dv: DenseVector): Unit = { + var i = 0 + while (i < dv.size) { + if (featureValueSets(i).size <= maxCategories) { + featureValueSets(i).add(dv(i)) + } + i += 1 + } + } + + def addSparseVector(sv: SparseVector): Unit = { + // TODO: This could be made more efficient. + var vecIndex = 0 // index into vector + var nzIndex = 0 // index into non-zero elements + while (vecIndex < sv.size) { + val featureValue = if (nzIndex < sv.indices.size && vecIndex == sv.indices(nzIndex)) { + nzIndex += 1 + sv.values(nzIndex - 1) + } else { + 0.0 + } + if (featureValueSets(vecIndex).size <= maxCategories) { + featureValueSets(vecIndex).add(featureValue) + } + vecIndex += 1 + } + } + + } /** * Array (over features) of sets of distinct feature values (up to maxCategories values). @@ -122,20 +121,16 @@ class DatasetIndexer( */ def fit(data: RDD[Vector]): Unit = { // For each partition, get (featureValueStats, newNumFeatures). - // If all vectors have the same length, then newNumFeatures will be set to numFeatures. - // If a vector with a new length is found, then newNumFeatures is set to that length. + // If all vectors have the same length, then newNumFeatures = -1. + // If a vector with a new length is found, then newNumFeatures is set to that length. val partitionFeatureValueSets: RDD[(Option[FeatureValueStats], Int)] = data.mapPartitions { iter => // Make local copy of featureValueStats. // This will be None initially if this is the first dataset to be fitted. var localFeatureValueStats: Option[FeatureValueStats] = featureValueStats - var newNumFeatures: Int = localFeatureValueStats match { - case Some(fvs) => fvs.numFeatures - case None => -1 - } - /* + var localNumFeatures: Int = -1 // TODO: Track which features are known to be continuous already, and do not bother // updating counts for them. Probably store featureValueCounts in a linked list. - iter.foreach { _ match { + iter.foreach { case dv: DenseVector => localFeatureValueStats match { case Some(fvs) => @@ -143,10 +138,9 @@ class DatasetIndexer( fvs.addDenseVector(dv) } else { // non-matching vector lengths - newNumFeatures = dv.size + localNumFeatures = dv.size } case None => - newNumFeatures = dv.size localFeatureValueStats = Some(new FeatureValueStats(dv.size, maxCategories)) localFeatureValueStats.get.addDenseVector(dv) } @@ -157,32 +151,25 @@ class DatasetIndexer( fvs.addSparseVector(sv) } else { // non-matching vector lengths - newNumFeatures = sv.size + localNumFeatures = sv.size } case None => - newNumFeatures = sv.size localFeatureValueStats = Some(new FeatureValueStats(sv.size, maxCategories)) localFeatureValueStats.get.addSparseVector(sv) } - }} - */ - Iterator((localFeatureValueStats, newNumFeatures)) + } + Iterator((localFeatureValueStats, localNumFeatures)) } - val (aggFeatureValueStats: Option[FeatureValueStats], newNumFeatures: Int) = (None, -1) - /* + val (aggFeatureValueStats: Option[FeatureValueStats], newNumFeatures: Int) = partitionFeatureValueSets.fold((None, -1)) { case ((Some(fvs1), newNumFeatures1), (Some(fvs2), newNumFeatures2)) => if (fvs2.numFeatures == fvs1.numFeatures) { - val mergedFVS = fvs1.merge(fvs2) - (newNumFeatures1, newNumFeatures2) match { - case (-1, -1) => - // good: vector lengths match - (Some(mergedFVS), -1) - case (-1, _) => - (Some(mergedFVS), newNumFeatures2) - case (_, _) => - (Some(mergedFVS), newNumFeatures1) + val tmpNumFeatures = (newNumFeatures1, newNumFeatures2) match { + case (-1, -1) => -1 // good: vector lengths match + case (-1, _) => newNumFeatures2 + case (_, _) => newNumFeatures1 } + (Some(fvs1.merge(fvs2)), tmpNumFeatures) } else { // non-matching vector lengths (Some(fvs1), fvs2.numFeatures) @@ -194,7 +181,6 @@ class DatasetIndexer( case ((None, -1), (None, -1)) => (None, -1) } - */ if (newNumFeatures != -1) { throw new RuntimeException("DatasetIndexer given records of non-matching length." + s" Found records with length ${aggFeatureValueStats.get.numFeatures} and length" + @@ -211,12 +197,16 @@ class DatasetIndexer( } /** + * Transforms the given dataset using the indexes returned by [[getCategoricalFeatureIndexes]]. + * Categorical features are mapped to their feature value indices. + * Continuous features (columns) are left unchanged. + * + * This currently converts sparse vectors to dense ones. * * @param data Dataset with equal-length vectors. * NOTE: A single instance of [[DatasetIndexer]] must always be given vectors of * the same length. If given non-matching vectors, this method will throw an error. - * @return Dataset with categorical features modified to have 0-based indices, - * using the index returned by + * @return Dataset with categorical features modified to use 0-based indices. */ def transform(data: RDD[Vector]): RDD[Vector] = { val catFeatIdx = getCategoricalFeatureIndexes diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala index d7d74f773fad5..1a66e28f06a96 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala @@ -81,8 +81,8 @@ class DatasetIndexerSuite extends FunSuite with LocalSparkContext { test("Same result with dense and sparse vectors") { def testDenseSparse(densePoints: Seq[Vector], sparsePoints: Seq[Vector]): Unit = { - assert(densePoints.zip(sparsePoints).forall { case (dv, sv) => dv.toArray == sv.toArray }, - "typo in unit test") + assert(densePoints.zip(sparsePoints).forall { case (dv, sv) => dv.toArray === sv.toArray }, + s"typo in unit test") val denseRDD = sc.parallelize(densePoints) val sparseRDD = sc.parallelize(sparsePoints) From 3a4a0bd73126e53c1d9a85e5ba79079c337b55a7 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 28 Oct 2014 10:27:49 -0700 Subject: [PATCH 04/13] Added another test for DatasetIndexer --- .../spark/mllib/feature/DatasetIndexer.scala | 5 ++- .../mllib/feature/DatasetIndexerSuite.scala | 36 +++++++++++-------- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala index f08b7e246df6b..8801efa65783c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala @@ -42,6 +42,8 @@ import org.apache.spark.util.collection.OpenHashSet * datasetIndexer.fit(myData2) * val indexedData2: RDD[Vector] = datasetIndexer.transform(myData2) * val categoricalFeaturesInfo: Map[Int, Int] = datasetIndexer.getCategoricalFeaturesInfo() + * + * TODO: Add option for transform: defaultForUnknownValue (default index for unknown category). */ @Experimental class DatasetIndexer( @@ -238,7 +240,8 @@ class DatasetIndexer( case Some(fvs) => fvs.featureValueSets.zipWithIndex .filter(_._1.size <= maxCategories).map { case (featureValues, featureIndex) => - val categoryMap: Map[Double, Int] = featureValues.iterator.toList.sorted.zipWithIndex.toMap + val sortedFeatureValues = featureValues.iterator.toList.sorted + val categoryMap: Map[Double, Int] = sortedFeatureValues.zipWithIndex.toMap (featureIndex, categoryMap) }.toMap case None => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala index 1a66e28f06a96..e60bb54d9d479 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala @@ -17,11 +17,8 @@ package org.apache.spark.mllib.feature -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} -import org.apache.spark.mllib.random.RandomRDDs +import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.util.LocalSparkContext -import org.apache.spark.mllib.util.TestingUtils._ import org.scalatest.FunSuite class DatasetIndexerSuite extends FunSuite with LocalSparkContext { @@ -123,19 +120,30 @@ class DatasetIndexerSuite extends FunSuite with LocalSparkContext { testDenseSparse(densePoints2, sparsePoints2) } - test("Chooses between categorical and continuous features correctly") { - val points1 = Seq( + test("Builds correct categorical feature value index") { + def checkCategoricalFeatureIndex(values: Seq[Double], valueIndex: Map[Double, Int]): Unit = { + val valSet = values.toSet + assert(valueIndex.keys.toSet === valSet) + assert(valueIndex.values.toSet === Range(0, valSet.size).toSet) + } + val points = Seq( Array(1.0, 2.0, 0.0), Array(0.0, 1.0, 2.0), Array(0.0, 0.0, -1.0), Array(1.0, 3.0, 2.0)).map(Vectors.dense) - val rdd1 = sc.parallelize(points1) - val datasetIndexer1 = new DatasetIndexer(maxCategories = 2) - datasetIndexer1.fit(rdd1) - val featureIndex1 = datasetIndexer1.getCategoricalFeatureIndexes + val rdd = sc.parallelize(points, 2) + + val datasetIndexer2 = new DatasetIndexer(maxCategories = 2) + datasetIndexer2.fit(rdd) + val featureIndex2 = datasetIndexer2.getCategoricalFeatureIndexes + assert(featureIndex2.keys.toSet === Set(0)) + checkCategoricalFeatureIndex(points.map(_(0)), featureIndex2(0)) + + val datasetIndexer3 = new DatasetIndexer(maxCategories = 3) + datasetIndexer3.fit(rdd) + val featureIndex3 = datasetIndexer3.getCategoricalFeatureIndexes + assert(featureIndex3.keys.toSet === Set(0, 2)) + checkCategoricalFeatureIndex(points.map(_(0)), featureIndex3(0)) + checkCategoricalFeatureIndex(points.map(_(2)), featureIndex3(2)) } - - test("Indexes categorical features correctly") { - } - } From 038b9e3dfab141857b0472e879503d4145b5fe8e Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 29 Oct 2014 11:44:19 -0700 Subject: [PATCH 05/13] DatasetIndexer now maintains sparsity in SparseVector --- .../spark/mllib/feature/DatasetIndexer.scala | 134 +++++++++++------- .../mllib/feature/DatasetIndexerSuite.scala | 131 ++++++++++------- 2 files changed, 158 insertions(+), 107 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala index 8801efa65783c..0767ed9796947 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala @@ -44,6 +44,8 @@ import org.apache.spark.util.collection.OpenHashSet * val categoricalFeaturesInfo: Map[Int, Int] = datasetIndexer.getCategoricalFeaturesInfo() * * TODO: Add option for transform: defaultForUnknownValue (default index for unknown category). + * + * TODO: Add warning if a categorical feature has only 1 category. */ @Experimental class DatasetIndexer( @@ -51,6 +53,9 @@ class DatasetIndexer( val ignoreUnrecognizedCategories: Boolean = true) extends Logging with Serializable { + require(maxCategories > 1, + s"DatasetIndexer given maxCategories = $maxCategories, but requires maxCategories > 1.") + private class FeatureValueStats(val numFeatures: Int, val maxCategories: Int) extends Serializable { @@ -125,43 +130,44 @@ class DatasetIndexer( // For each partition, get (featureValueStats, newNumFeatures). // If all vectors have the same length, then newNumFeatures = -1. // If a vector with a new length is found, then newNumFeatures is set to that length. - val partitionFeatureValueSets: RDD[(Option[FeatureValueStats], Int)] = data.mapPartitions { iter => - // Make local copy of featureValueStats. - // This will be None initially if this is the first dataset to be fitted. - var localFeatureValueStats: Option[FeatureValueStats] = featureValueStats - var localNumFeatures: Int = -1 - // TODO: Track which features are known to be continuous already, and do not bother - // updating counts for them. Probably store featureValueCounts in a linked list. - iter.foreach { - case dv: DenseVector => - localFeatureValueStats match { - case Some(fvs) => - if (fvs.numFeatures == dv.size) { - fvs.addDenseVector(dv) - } else { - // non-matching vector lengths - localNumFeatures = dv.size - } - case None => - localFeatureValueStats = Some(new FeatureValueStats(dv.size, maxCategories)) - localFeatureValueStats.get.addDenseVector(dv) - } - case sv: SparseVector => - localFeatureValueStats match { - case Some(fvs) => - if (fvs.numFeatures == sv.size) { - fvs.addSparseVector(sv) - } else { - // non-matching vector lengths - localNumFeatures = sv.size - } - case None => - localFeatureValueStats = Some(new FeatureValueStats(sv.size, maxCategories)) - localFeatureValueStats.get.addSparseVector(sv) - } + val partitionFeatureValueSets: RDD[(Option[FeatureValueStats], Int)] = + data.mapPartitions { iter => + // Make local copy of featureValueStats. + // This will be None initially if this is the first dataset to be fitted. + var localFeatureValueStats: Option[FeatureValueStats] = featureValueStats + var localNumFeatures: Int = -1 + // TODO: Track which features are known to be continuous already, and do not bother + // updating counts for them. Probably store featureValueStats in a linked list. + iter.foreach { + case dv: DenseVector => + localFeatureValueStats match { + case Some(fvs) => + if (fvs.numFeatures == dv.size) { + fvs.addDenseVector(dv) + } else { + // non-matching vector lengths + localNumFeatures = dv.size + } + case None => + localFeatureValueStats = Some(new FeatureValueStats(dv.size, maxCategories)) + localFeatureValueStats.get.addDenseVector(dv) + } + case sv: SparseVector => + localFeatureValueStats match { + case Some(fvs) => + if (fvs.numFeatures == sv.size) { + fvs.addSparseVector(sv) + } else { + // non-matching vector lengths + localNumFeatures = sv.size + } + case None => + localFeatureValueStats = Some(new FeatureValueStats(sv.size, maxCategories)) + localFeatureValueStats.get.addSparseVector(sv) + } + } + Iterator((localFeatureValueStats, localNumFeatures)) } - Iterator((localFeatureValueStats, localNumFeatures)) - } val (aggFeatureValueStats: Option[FeatureValueStats], newNumFeatures: Int) = partitionFeatureValueSets.fold((None, -1)) { case ((Some(fvs1), newNumFeatures1), (Some(fvs2), newNumFeatures2)) => @@ -212,35 +218,57 @@ class DatasetIndexer( */ def transform(data: RDD[Vector]): RDD[Vector] = { val catFeatIdx = getCategoricalFeatureIndexes - data.map { v: Vector => v match { - case dv: DenseVector => - catFeatIdx.foreach { case (featureIndex, categoryMap) => - dv.values(featureIndex) = categoryMap(dv(featureIndex)) - } - dv.asInstanceOf[Vector] - case sv: SparseVector => - // TODO: This currently converts to a dense vector. After updating - // getCategoricalFeatureIndexes, make this maintain sparsity when possible. - val dv = sv.toArray - catFeatIdx.foreach { case (featureIndex, categoryMap) => - dv(featureIndex) = categoryMap(dv(featureIndex)) + data.mapPartitions { iterator => + val sortedCategoricalFeatureIndices = catFeatIdx.keys.toArray.sorted + iterator.map { v: Vector => + v match { + case dv: DenseVector => + catFeatIdx.foreach { case (featureIndex, categoryMap) => + dv.values(featureIndex) = categoryMap(dv(featureIndex)) + } + dv.asInstanceOf[Vector] + case sv: SparseVector => + var sortedFeatInd = 0 // index into sortedCategoricalFeatureIndices + var k = 0 // index into non-zero elements of sparse vector + while (sortedFeatInd < sortedCategoricalFeatureIndices.size && k < sv.indices.size) { + val featInd = sortedCategoricalFeatureIndices(sortedFeatInd) + if (featInd < sv.indices(k)) { + sortedFeatInd += 1 + } else if (featInd > sv.indices(k)) { + k += 1 + } else { + sv.values(k) = catFeatIdx(featInd)(sv.values(k)) + sortedFeatInd += 1 + k += 1 + } + } + sv.asInstanceOf[Vector] } - Vectors.dense(dv) - }} + } + } } /** * Based on datasets given to [[fit]], decide which features are categorical, * and choose indices for categories. - * @return Feature index. Keys are categorical feature indices (column indices). + * + * Sparsity: This tries to maintain sparsity by treating value 0.0 specially. + * If a categorical feature takes value 0.0, then value 0.0 is given index 0. + * + * @return Feature value index. Keys are categorical feature indices (column indices). * Values are mappings from original features values to 0-based category indices. */ def getCategoricalFeatureIndexes: Map[Int, Map[Double, Int]] = featureValueStats match { - // TODO: It would be ideal to have value 0 set to index 0 to maintain sparsity if possible. case Some(fvs) => fvs.featureValueSets.zipWithIndex .filter(_._1.size <= maxCategories).map { case (featureValues, featureIndex) => - val sortedFeatureValues = featureValues.iterator.toList.sorted + // Get feature values, but remove 0 to treat separately. + // If value 0 exists, give it index 0 to maintain sparsity if possible. + var sortedFeatureValues = featureValues.iterator.filter(_ != 0.0).toArray.sorted + val zeroExists = sortedFeatureValues.size + 1 == featureValues.size + if (zeroExists) { + sortedFeatureValues = 0.0 +: sortedFeatureValues + } val categoryMap: Map[Double, Int] = sortedFeatureValues.zipWithIndex.toMap (featureIndex, categoryMap) }.toMap diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala index e60bb54d9d479..c8479734bd855 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala @@ -17,21 +17,22 @@ package org.apache.spark.mllib.feature -import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD import org.scalatest.FunSuite class DatasetIndexerSuite extends FunSuite with LocalSparkContext { test("Can fit an empty RDD") { - val rdd = sc.parallelize(Array.empty[Vector]) + val rdd = sc.parallelize(Array.empty[Vector], 2) val datasetIndexer = new DatasetIndexer(maxCategories = 10) datasetIndexer.fit(rdd) } test("If not fitted, throws error when transforming RDD or getting feature indexes") { val points = Seq(Array(1.0, 2.0), Array(0.0, 1.0)) - val rdd = sc.parallelize(points.map(Vectors.dense)) + val rdd = sc.parallelize(points.map(Vectors.dense), 2) val datasetIndexer = new DatasetIndexer(maxCategories = 10) intercept[RuntimeException] { datasetIndexer.transform(rdd) @@ -48,16 +49,16 @@ class DatasetIndexerSuite extends FunSuite with LocalSparkContext { Array(1.0, 2.0), Array(0.0, 1.0, 2.0), Array(-1.0, 3.0)) - val rdd1 = sc.parallelize(points1.map(Vectors.dense)) + val rdd1 = sc.parallelize(points1.map(Vectors.dense), 2) val points2a = Seq( Array(1.0, 2.0), Array(-1.0, 3.0)) - val rdd2a = sc.parallelize(points2a.map(Vectors.dense)) + val rdd2a = sc.parallelize(points2a.map(Vectors.dense), 2) val points2b = Seq( Array(1.0), Array(-1.0)) - val rdd2b = sc.parallelize(points2b.map(Vectors.dense)) - val rdd3 = sc.parallelize(Array.empty[Vector]) + val rdd2b = sc.parallelize(points2b.map(Vectors.dense), 2) + val rdd3 = sc.parallelize(Array.empty[Vector], 2) val datasetIndexer1 = new DatasetIndexer(maxCategories = 10) intercept[RuntimeException] { @@ -80,8 +81,8 @@ class DatasetIndexerSuite extends FunSuite with LocalSparkContext { def testDenseSparse(densePoints: Seq[Vector], sparsePoints: Seq[Vector]): Unit = { assert(densePoints.zip(sparsePoints).forall { case (dv, sv) => dv.toArray === sv.toArray }, s"typo in unit test") - val denseRDD = sc.parallelize(densePoints) - val sparseRDD = sc.parallelize(sparsePoints) + val denseRDD = sc.parallelize(densePoints, 2) + val sparseRDD = sc.parallelize(sparsePoints, 2) val denseDatasetIndexer = new DatasetIndexer(maxCategories = 2) val sparseDatasetIndexer = new DatasetIndexer(maxCategories = 2) @@ -97,53 +98,75 @@ class DatasetIndexerSuite extends FunSuite with LocalSparkContext { "Categorical feature value indexes chosen from dense vs. sparse vectors did not match.") } - val densePoints1 = Seq( - Array(1.0, 2.0, 0.0), - Array(0.0, 1.0, 2.0), - Array(0.0, 0.0, -1.0), - Array(1.0, 3.0, 2.0)).map(Vectors.dense) - val sparsePoints1 = Seq( - Vectors.sparse(3, Array(0, 1), Array(1.0, 2.0)), - Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)), - Vectors.sparse(3, Array(2), Array(-1.0)), - Vectors.sparse(3, Array(0, 1, 2), Array(1.0, 3.0, 2.0))) - testDenseSparse(densePoints1, sparsePoints1) - - val densePoints2 = Seq( - Array(1.0, 1.0, 0.0), - Array(0.0, 1.0, 0.0), - Array(-1.0, 1.0, 0.0)).map(Vectors.dense) - val sparsePoints2 = Seq( - Vectors.sparse(3, Array(0, 1), Array(1.0, 1.0)), - Vectors.sparse(3, Array(1), Array(1.0)), - Vectors.sparse(3, Array(0, 1), Array(-1.0, 1.0))) - testDenseSparse(densePoints2, sparsePoints2) + testDenseSparse(DatasetIndexerSuite.densePoints1, DatasetIndexerSuite.sparsePoints1) + testDenseSparse(DatasetIndexerSuite.densePoints2, DatasetIndexerSuite.sparsePoints2) } - test("Builds correct categorical feature value index") { - def checkCategoricalFeatureIndex(values: Seq[Double], valueIndex: Map[Double, Int]): Unit = { - val valSet = values.toSet - assert(valueIndex.keys.toSet === valSet) - assert(valueIndex.values.toSet === Range(0, valSet.size).toSet) + test("Builds correct categorical feature value index, and transform correctly") { + def checkCategoricalFeatureIndex( + rdd: RDD[Vector], + maxCategories: Int, + categoricalFeatures: Set[Int]): Unit = { + val datasetIndexer = new DatasetIndexer(maxCategories = maxCategories) + datasetIndexer.fit(rdd) + val featureIndex = datasetIndexer.getCategoricalFeatureIndexes + assert(featureIndex.keys.toSet === categoricalFeatures) + val indexedRDD = datasetIndexer.transform(rdd) + categoricalFeatures.foreach { catFeature => + val origValueSet = rdd.collect().map(_(catFeature)).toSet + val targetValueIndexSet = Range(0, origValueSet.size).toSet + val valueIndex = featureIndex(catFeature) + assert(valueIndex.keys.toSet === origValueSet) + assert(valueIndex.values.toSet === targetValueIndexSet) + if (origValueSet.contains(0.0)) { + assert(valueIndex(0.0) === 0) // value 0 gets index 0 + } + // Check transformed data + assert(indexedRDD.map(_(catFeature)).collect().toSet === targetValueIndexSet) + } } - val points = Seq( - Array(1.0, 2.0, 0.0), - Array(0.0, 1.0, 2.0), - Array(0.0, 0.0, -1.0), - Array(1.0, 3.0, 2.0)).map(Vectors.dense) - val rdd = sc.parallelize(points, 2) - - val datasetIndexer2 = new DatasetIndexer(maxCategories = 2) - datasetIndexer2.fit(rdd) - val featureIndex2 = datasetIndexer2.getCategoricalFeatureIndexes - assert(featureIndex2.keys.toSet === Set(0)) - checkCategoricalFeatureIndex(points.map(_(0)), featureIndex2(0)) - - val datasetIndexer3 = new DatasetIndexer(maxCategories = 3) - datasetIndexer3.fit(rdd) - val featureIndex3 = datasetIndexer3.getCategoricalFeatureIndexes - assert(featureIndex3.keys.toSet === Set(0, 2)) - checkCategoricalFeatureIndex(points.map(_(0)), featureIndex3(0)) - checkCategoricalFeatureIndex(points.map(_(2)), featureIndex3(2)) + + val rdd1 = sc.parallelize(DatasetIndexerSuite.densePoints1, 2) + checkCategoricalFeatureIndex(rdd1, maxCategories = 2, categoricalFeatures = Set(0)) + checkCategoricalFeatureIndex(rdd1, maxCategories = 3, categoricalFeatures = Set(0, 2)) + + val rdd2 = sc.parallelize(DatasetIndexerSuite.densePoints2, 2) + checkCategoricalFeatureIndex(rdd2, maxCategories = 2, categoricalFeatures = Set(1, 2)) } + + test("Maintain sparsity for sparse vectors") { + def checkSparsity(points: Seq[Vector], maxCategories: Int): Unit = { + val rdd = sc.parallelize(points, 2) + val datasetIndexer = new DatasetIndexer(maxCategories = maxCategories) + datasetIndexer.fit(rdd) + val indexedPoints = datasetIndexer.transform(rdd).collect() + points.zip(indexedPoints).foreach { case (orig: SparseVector, indexed: SparseVector) => + assert(orig.indices.size == indexed.indices.size) + } + } + checkSparsity(DatasetIndexerSuite.sparsePoints1, maxCategories = 2) + checkSparsity(DatasetIndexerSuite.sparsePoints2, maxCategories = 2) + } +} + +object DatasetIndexerSuite { + private val densePoints1 = Seq( + Array(1.0, 2.0, 0.0), + Array(0.0, 1.0, 2.0), + Array(0.0, 0.0, -1.0), + Array(1.0, 3.0, 2.0)).map(Vectors.dense) + private val sparsePoints1 = Seq( + Vectors.sparse(3, Array(0, 1), Array(1.0, 2.0)), + Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)), + Vectors.sparse(3, Array(2), Array(-1.0)), + Vectors.sparse(3, Array(0, 1, 2), Array(1.0, 3.0, 2.0))) + + private val densePoints2 = Seq( + Array(1.0, 1.0, 0.0), + Array(0.0, 1.0, 0.0), + Array(-1.0, 1.0, 0.0)).map(Vectors.dense) + private val sparsePoints2 = Seq( + Vectors.sparse(3, Array(0, 1), Array(1.0, 1.0)), + Vectors.sparse(3, Array(1), Array(1.0)), + Vectors.sparse(3, Array(0, 1), Array(-1.0, 1.0))) } From 3f041f87fec29f60baf4f9c997c321c707190be2 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Fri, 31 Oct 2014 14:34:49 -0700 Subject: [PATCH 06/13] Final cleanups for DatasetIndexer --- .../spark/mllib/feature/DatasetIndexer.scala | 63 +++++++++++-------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala index 0767ed9796947..b7dd4e5d88f58 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala @@ -19,7 +19,7 @@ package org.apache.spark.mllib.feature import org.apache.spark.Logging import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.linalg.{Vectors, DenseVector, SparseVector, Vector} +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.OpenHashSet @@ -43,15 +43,22 @@ import org.apache.spark.util.collection.OpenHashSet * val indexedData2: RDD[Vector] = datasetIndexer.transform(myData2) * val categoricalFeaturesInfo: Map[Int, Int] = datasetIndexer.getCategoricalFeaturesInfo() * - * TODO: Add option for transform: defaultForUnknownValue (default index for unknown category). - * * TODO: Add warning if a categorical feature has only 1 category. + * + * TODO: Add option for allowing unknown categories: + * Parameter allowUnknownCategories: + * If true, then handle unknown categories during `transform` + * by assigning them to an extra category index. + * That unknown category index will be the largest index; + * e.g., if 5 categories are found during `fit`, then any + * unknown categories will be assigned index 5. + * + * @param maxCategories Threshold for the number of values a categorical feature can take. + * If a feature is found to have > maxCategories values, then it is + * declared continuous. */ @Experimental -class DatasetIndexer( - val maxCategories: Int, - val ignoreUnrecognizedCategories: Boolean = true) - extends Logging with Serializable { +class DatasetIndexer(val maxCategories: Int) extends Logging with Serializable { require(maxCategories > 1, s"DatasetIndexer given maxCategories = $maxCategories, but requires maxCategories > 1.") @@ -69,6 +76,8 @@ class DatasetIndexer( def merge(other: FeatureValueStats): FeatureValueStats = { featureValueSets.zip(other.featureValueSets).foreach { case (fvs1, fvs2) => fvs2.iterator.foreach { val2 => + // Once we have found > maxCategories values, we know the feature is continuous + // and do not need to collect more values for it. if (fvs1.size <= maxCategories) fvs1.add(val2) } } @@ -122,20 +131,23 @@ class DatasetIndexer( * can change the behavior of [[transform]] and [[getCategoricalFeatureIndexes]]. * It is best to [[fit]] on all datasets before calling [[transform]] on any. * + * Note: To run this on an RDD[Double], convert to Vector via `data.map(Vectors.dense(_))`. + * * @param data Dataset with equal-length vectors. * NOTE: A single instance of [[DatasetIndexer]] must always be given vectors of * the same length. If given non-matching vectors, this method will throw an error. */ def fit(data: RDD[Vector]): Unit = { + // For each partition, get (featureValueStats, newNumFeatures). - // If all vectors have the same length, then newNumFeatures = -1. + // If all vectors have the same length, then newNumFeatures = None. // If a vector with a new length is found, then newNumFeatures is set to that length. - val partitionFeatureValueSets: RDD[(Option[FeatureValueStats], Int)] = + val partitionFeatureValueSets: RDD[(Option[FeatureValueStats], Option[Int])] = data.mapPartitions { iter => // Make local copy of featureValueStats. // This will be None initially if this is the first dataset to be fitted. var localFeatureValueStats: Option[FeatureValueStats] = featureValueStats - var localNumFeatures: Int = -1 + var newNumFeatures: Option[Int] = None // TODO: Track which features are known to be continuous already, and do not bother // updating counts for them. Probably store featureValueStats in a linked list. iter.foreach { @@ -146,7 +158,7 @@ class DatasetIndexer( fvs.addDenseVector(dv) } else { // non-matching vector lengths - localNumFeatures = dv.size + newNumFeatures = Some(dv.size) } case None => localFeatureValueStats = Some(new FeatureValueStats(dv.size, maxCategories)) @@ -159,40 +171,40 @@ class DatasetIndexer( fvs.addSparseVector(sv) } else { // non-matching vector lengths - localNumFeatures = sv.size + newNumFeatures = Some(sv.size) } case None => localFeatureValueStats = Some(new FeatureValueStats(sv.size, maxCategories)) localFeatureValueStats.get.addSparseVector(sv) } } - Iterator((localFeatureValueStats, localNumFeatures)) + Iterator((localFeatureValueStats, newNumFeatures)) } - val (aggFeatureValueStats: Option[FeatureValueStats], newNumFeatures: Int) = - partitionFeatureValueSets.fold((None, -1)) { + val (aggFeatureValueStats: Option[FeatureValueStats], newNumFeatures: Option[Int]) = + partitionFeatureValueSets.fold((None, None)) { case ((Some(fvs1), newNumFeatures1), (Some(fvs2), newNumFeatures2)) => if (fvs2.numFeatures == fvs1.numFeatures) { val tmpNumFeatures = (newNumFeatures1, newNumFeatures2) match { - case (-1, -1) => -1 // good: vector lengths match - case (-1, _) => newNumFeatures2 + case (None, None) => None // good: vector lengths match + case (None, _) => newNumFeatures2 case (_, _) => newNumFeatures1 } (Some(fvs1.merge(fvs2)), tmpNumFeatures) } else { // non-matching vector lengths - (Some(fvs1), fvs2.numFeatures) + (Some(fvs1), Some(fvs2.numFeatures)) } - case ((Some(fvs1), newNumFeatures1), (None, -1)) => + case ((Some(fvs1), newNumFeatures1), (None, None)) => (Some(fvs1), newNumFeatures1) - case ((None, -1), (Some(fvs2), newNumFeatures2)) => + case ((None, None), (Some(fvs2), newNumFeatures2)) => (Some(fvs2), newNumFeatures2) - case ((None, -1), (None, -1)) => - (None, -1) + case ((None, None), (None, None)) => + (None, None) } - if (newNumFeatures != -1) { + if (newNumFeatures.nonEmpty) { throw new RuntimeException("DatasetIndexer given records of non-matching length." + s" Found records with length ${aggFeatureValueStats.get.numFeatures} and length" + - s" $newNumFeatures") + s" ${newNumFeatures.get}") } (featureValueStats, aggFeatureValueStats) match { case (Some(origFVS), Some(newFVS)) => @@ -209,7 +221,7 @@ class DatasetIndexer( * Categorical features are mapped to their feature value indices. * Continuous features (columns) are left unchanged. * - * This currently converts sparse vectors to dense ones. + * Note: To run this on an RDD[Double], convert to Vector via `data.map(Vectors.dense(_))`. * * @param data Dataset with equal-length vectors. * NOTE: A single instance of [[DatasetIndexer]] must always be given vectors of @@ -276,5 +288,4 @@ class DatasetIndexer( throw new RuntimeException("DatasetIndexer.getCategoricalFeatureIndexes called," + " but no datasets have been indexed via fit() yet.") } - } From 6a2f55322206275679630dbb86b1c0bedf986f01 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 4 Nov 2014 14:15:11 -0800 Subject: [PATCH 07/13] Updated TODO for allowUnknownCategories --- .../org/apache/spark/mllib/feature/DatasetIndexer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala index b7dd4e5d88f58..43edaebf53b75 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala @@ -49,9 +49,9 @@ import org.apache.spark.util.collection.OpenHashSet * Parameter allowUnknownCategories: * If true, then handle unknown categories during `transform` * by assigning them to an extra category index. - * That unknown category index will be the largest index; - * e.g., if 5 categories are found during `fit`, then any - * unknown categories will be assigned index 5. + * That unknown category index should be index 1; this will allow maintaining sparsity + * (reserving index 0 for value 0.0), and it will allow category indices to remain fixed + * even if more categories are added later. * * @param maxCategories Threshold for the number of values a categorical feature can take. * If a feature is found to have > maxCategories values, then it is From 6d8f3f1a015b9f7ecebb098c9988e9c855931a8d Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 25 Feb 2015 23:27:13 -0800 Subject: [PATCH 08/13] Added partly done DatasetIndexer to spark.ml --- .../spark/ml/feature/DatasetIndexer.scala | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/feature/DatasetIndexer.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/DatasetIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/DatasetIndexer.scala new file mode 100644 index 0000000000000..2027e2986eed7 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/DatasetIndexer.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.ml.{UnaryTransformer, Estimator, Model} +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.feature.{DatasetIndexer => OldDatasetIndexer} +import org.apache.spark.sql.SchemaRDD + + +private[ml] trait DatasetIndexerParams extends Params with HasInputCol with HasOutputCol { + + val maxCategories = new IntParam(this, "maxCategories", + "Threshold for the number of values a categorical feature can take." + + " If a feature is found to have > maxCategories values, then it is declared continuous.", + Some(20)) + + def getMaxCategories: Int = get(maxCategories) +} + +/** + * :: Experimental :: + * Class for indexing columns in a dataset. + * + * This helps process a dataset of unknown vectors into a dataset with some continuous features + * and some categorical features. The choice between continuous and categorical is based upon + * a maxCategories parameter. + * + * This can also map categorical feature values to 0-based indices. + * + * Usage: + * val myData1: RDD[Vector] = ... + * val myData2: RDD[Vector] = ... + * val datasetIndexer = new DatasetIndexer(maxCategories) + * datasetIndexer.fit(myData1) + * val indexedData1: RDD[Vector] = datasetIndexer.transform(myData1) + * datasetIndexer.fit(myData2) + * val indexedData2: RDD[Vector] = datasetIndexer.transform(myData2) + * val categoricalFeaturesInfo: Map[Int, Int] = datasetIndexer.getCategoricalFeaturesInfo() + * + * TODO: Add warning if a categorical feature has only 1 category. + * + * TODO: Add option for allowing unknown categories: + * Parameter allowUnknownCategories: + * If true, then handle unknown categories during `transform` + * by assigning them to an extra category index. + * That unknown category index should be index 1; this will allow maintaining sparsity + * (reserving index 0 for value 0.0), and it will allow category indices to remain fixed + * even if more categories are added later. + */ +class DatasetIndexer extends Estimator[DatasetIndexerModel] with DatasetIndexerParams { + + def setMaxCategories(value: Int): this.type = set(maxCategories, value) + + override def fit(dataset: SchemaRDD, paramMap: ParamMap): DatasetIndexerModel = { + val map = this.paramMap ++ paramMap + val indexer = new OldDatasetIndexer(maxCategories = map(maxCategories)) + + } +} + +class DatasetIndexerModel extends Model[DatasetIndexerModel] with DatasetIndexerParams { + + override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { + + } +} From 286d22104e19585368dd83749a15a1409a9a53cf Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Fri, 3 Apr 2015 14:08:53 -0700 Subject: [PATCH 09/13] Reworked DatasetIndexer for spark.ml API, and renamed it to VectorIndexer --- .../spark/ml/feature/DatasetIndexer.scala | 82 ----- .../spark/ml/feature/VectorIndexer.scala | 312 ++++++++++++++++++ .../org/apache/spark/ml/param/params.scala | 20 +- .../spark/mllib/feature/DatasetIndexer.scala | 291 ---------------- .../spark/ml/feature/FeatureTests.scala | 27 ++ .../spark/ml/feature/NormalizerSuite.scala | 2 +- .../spark/ml/feature/VectorIndexerSuite.scala | 186 +++++++++++ .../mllib/feature/DatasetIndexerSuite.scala | 172 ---------- 8 files changed, 540 insertions(+), 552 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/feature/DatasetIndexer.scala create mode 100644 mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala create mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/FeatureTests.scala create mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/DatasetIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/DatasetIndexer.scala deleted file mode 100644 index 2027e2986eed7..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/DatasetIndexer.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.feature - -import org.apache.spark.ml.{UnaryTransformer, Estimator, Model} -import org.apache.spark.ml.param._ -import org.apache.spark.mllib.feature.{DatasetIndexer => OldDatasetIndexer} -import org.apache.spark.sql.SchemaRDD - - -private[ml] trait DatasetIndexerParams extends Params with HasInputCol with HasOutputCol { - - val maxCategories = new IntParam(this, "maxCategories", - "Threshold for the number of values a categorical feature can take." + - " If a feature is found to have > maxCategories values, then it is declared continuous.", - Some(20)) - - def getMaxCategories: Int = get(maxCategories) -} - -/** - * :: Experimental :: - * Class for indexing columns in a dataset. - * - * This helps process a dataset of unknown vectors into a dataset with some continuous features - * and some categorical features. The choice between continuous and categorical is based upon - * a maxCategories parameter. - * - * This can also map categorical feature values to 0-based indices. - * - * Usage: - * val myData1: RDD[Vector] = ... - * val myData2: RDD[Vector] = ... - * val datasetIndexer = new DatasetIndexer(maxCategories) - * datasetIndexer.fit(myData1) - * val indexedData1: RDD[Vector] = datasetIndexer.transform(myData1) - * datasetIndexer.fit(myData2) - * val indexedData2: RDD[Vector] = datasetIndexer.transform(myData2) - * val categoricalFeaturesInfo: Map[Int, Int] = datasetIndexer.getCategoricalFeaturesInfo() - * - * TODO: Add warning if a categorical feature has only 1 category. - * - * TODO: Add option for allowing unknown categories: - * Parameter allowUnknownCategories: - * If true, then handle unknown categories during `transform` - * by assigning them to an extra category index. - * That unknown category index should be index 1; this will allow maintaining sparsity - * (reserving index 0 for value 0.0), and it will allow category indices to remain fixed - * even if more categories are added later. - */ -class DatasetIndexer extends Estimator[DatasetIndexerModel] with DatasetIndexerParams { - - def setMaxCategories(value: Int): this.type = set(maxCategories, value) - - override def fit(dataset: SchemaRDD, paramMap: ParamMap): DatasetIndexerModel = { - val map = this.paramMap ++ paramMap - val indexer = new OldDatasetIndexer(maxCategories = map(maxCategories)) - - } -} - -class DatasetIndexerModel extends Model[DatasetIndexerModel] with DatasetIndexerParams { - - override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { - - } -} diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala new file mode 100644 index 0000000000000..22abc0b4ba368 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.param.{HasInputCol, HasOutputCol, IntParam, ParamMap, Params} +import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, VectorUDT} +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.functions.callUDF +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.collection.OpenHashSet + + +/** Private trait for params for VectorIndexer and VectorIndexerModel */ +private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOutputCol { + + /** + * Threshold for the number of values a categorical feature can take. + * If a feature is found to have > maxCategories values, then it is declared continuous. + * + * (default = 20) + */ + val maxCategories = new IntParam(this, "maxCategories", + "Threshold for the number of values a categorical feature can take." + + " If a feature is found to have > maxCategories values, then it is declared continuous.", + Some(20)) + + /** @group getParam */ + def getMaxCategories: Int = get(maxCategories) + + protected def validateAndTransformSchema(schema: StructType, paramMap: ParamMap): StructType = { + val map = this.paramMap ++ paramMap + val dataType = new VectorUDT + val className = this.getClass.getSimpleName + require(map.contains(inputCol), s"$className requires input column parameter: $inputCol") + require(map.contains(outputCol), s"$className requires output column parameter: $outputCol") + checkInputColumn(schema, map(inputCol), dataType) + addOutputColumn(schema, map(outputCol), dataType) + } +} + +/** + * :: AlphaComponent :: + * + * Class for indexing categorical feature columns in a dataset of [[Vector]]. + * + * This has 2 usage modes: + * - Automatically identify categorical features (default behavior) + * - This helps process a dataset of unknown vectors into a dataset with some continuous + * features and some categorical features. The choice between continuous and categorical + * is based upon a maxCategories parameter. + * - Set maxCategories to the maximum number of categorical any categorical feature should have. + * - Index all features, if all features are categorical + * - If maxCategories is set to be very large, then this will build an index of unique + * values for all features. + * - Warning: This can cause problems if features are continuous since this will collect ALL + * unique values to the driver. + * + * This returns a model which can transform categorical features to use 0-based indices. + * + * Index stability: + * - This is not guaranteed to choose the same category index across multiple runs. + * - If a categorical feature includes value 0, then this is guaranteed to map value 0 to index 0. + * This maintains vector sparsity. + * - More stability may be added in the future. + * + * TODO: Add warning if a categorical feature has only 1 category? + * + * TODO: Add option for allowing unknown categories: + * Parameter allowUnknownCategories: + * If true, then handle unknown categories during `transform` + * by assigning them to an extra category index. + * That unknown category index should be index 1; this will allow maintaining sparsity + * (reserving index 0 for value 0.0), and it will allow category indices to remain fixed + * even if more categories are added later. + */ +@AlphaComponent +class VectorIndexer extends Estimator[VectorIndexerModel] with VectorIndexerParams { + + /** @group setParam */ + def setMaxCategories(value: Int): this.type = { + require(value > 1, + s"DatasetIndexer given maxCategories = value, but requires maxCategories > 1.") + set(maxCategories, value) + } + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def fit(dataset: DataFrame, paramMap: ParamMap): VectorIndexerModel = { + transformSchema(dataset.schema, paramMap, logging = true) + val map = this.paramMap ++ paramMap + val firstRow = dataset.select(map(inputCol)).take(1) + require(firstRow.size == 1, s"VectorIndexer cannot be fit on an empty dataset.") + val numFeatures = firstRow(0).getAs[Vector](0).size + val vectorDataset = dataset.select(map(inputCol)).map { case Row(v: Vector) => v } + val maxCats = map(maxCategories) + val categoryStats: VectorIndexer.CategoryStats = vectorDataset.mapPartitions { iter => + val localCatStats = new VectorIndexer.CategoryStats(numFeatures, maxCats) + iter.foreach(localCatStats.addVector) + Iterator(localCatStats) + }.reduce((stats1, stats2) => stats1.merge(stats2)) + val model = new VectorIndexerModel(this, map, numFeatures, categoryStats.getCategoryMaps) + Params.inheritValues(map, this, model) + model + } + + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + validateAndTransformSchema(schema, paramMap) + } +} + +private object VectorIndexer { + + /** + * Helper class for tracking unique values for each feature. + * + * TODO: Track which features are known to be continuous already; do not update counts for them. + * + * @param numFeatures This class fails if it encounters a Vector whose length is not numFeatures. + * @param maxCategories This class caps the number of unique values collected at maxCategories. + */ + class CategoryStats(private val numFeatures: Int, private val maxCategories: Int) + extends Serializable { + + /** featureValueSets[feature index] = set of unique values */ + private val featureValueSets = + Array.fill[OpenHashSet[Double]](numFeatures)(new OpenHashSet[Double]()) + + /** + * Merge with another instance, modifying this instance. + * @param other Other instance, not modified + * @return This instance, modified + */ + def merge(other: CategoryStats): CategoryStats = { + featureValueSets.zip(other.featureValueSets).foreach { case (thisValSet, otherValSet) => + otherValSet.iterator.foreach { x => + // Once we have found > maxCategories values, we know the feature is continuous + // and do not need to collect more values for it. + if (thisValSet.size <= maxCategories) thisValSet.add(x) + } + } + this + } + + /** Add a new vector to this index, updating sets of unique feature values */ + def addVector(v: Vector): Unit = { + require(v.size == numFeatures, s"VectorIndexer expected $numFeatures features but" + + s" found vector of size ${v.size}.") + v match { + case dv: DenseVector => addDenseVector(dv) + case sv: SparseVector => addSparseVector(sv) + } + } + + /** + * Based on stats collected, decide which features are categorical, + * and choose indices for categories. + * + * Sparsity: This tries to maintain sparsity by treating value 0.0 specially. + * If a categorical feature takes value 0.0, then value 0.0 is given index 0. + * + * @return Feature value index. Keys are categorical feature indices (column indices). + * Values are mappings from original features values to 0-based category indices. + */ + def getCategoryMaps: Map[Int, Map[Double, Int]] = { + // Filter out features which are declared continuous. + featureValueSets.zipWithIndex.filter(_._1.size <= maxCategories).map { + case (featureValues: OpenHashSet[Double], featureIndex: Int) => + // Get feature values, but remove 0 to treat separately. + // If value 0 exists, give it index 0 to maintain sparsity if possible. + var sortedFeatureValues = featureValues.iterator.filter(_ != 0.0).toArray.sorted + val zeroExists = sortedFeatureValues.size + 1 == featureValues.size + if (zeroExists) { + sortedFeatureValues = 0.0 +: sortedFeatureValues + } + val categoryMap: Map[Double, Int] = sortedFeatureValues.zipWithIndex.toMap + (featureIndex, categoryMap) + }.toMap + } + + private def addDenseVector(dv: DenseVector): Unit = { + var i = 0 + while (i < dv.size) { + if (featureValueSets(i).size <= maxCategories) { + featureValueSets(i).add(dv(i)) + } + i += 1 + } + } + + private def addSparseVector(sv: SparseVector): Unit = { + // TODO: This might be able to handle 0's more efficiently. + var vecIndex = 0 // index into vector + var k = 0 // index into non-zero elements + while (vecIndex < sv.size) { + val featureValue = if (k < sv.indices.size && vecIndex == sv.indices(k)) { + k += 1 + sv.values(k - 1) + } else { + 0.0 + } + if (featureValueSets(vecIndex).size <= maxCategories) { + featureValueSets(vecIndex).add(featureValue) + } + vecIndex += 1 + } + } + } +} + +/** + * :: AlphaComponent :: + * + * Transform categorical features to use 0-based indices instead of their original values. + * - Categorical features are mapped to their feature value indices. + * - Continuous features (columns) are left unchanged. + * + * This maintains vector sparsity. + * + * Note: If this model was created for vectors of length numFeatures, + * this model's transform method must be given vectors of length numFeatures. + * + * @param numFeatures Number of features, i.e., length of Vectors which this transforms + * @param categoryMaps Feature value index. Keys are categorical feature indices (column indices). + * Values are maps from original features values to 0-based category indices. + */ +@AlphaComponent +class VectorIndexerModel private[ml] ( + override val parent: VectorIndexer, + override val fittingParamMap: ParamMap, + val numFeatures: Int, + val categoryMaps: Map[Int, Map[Double, Int]]) + extends Model[VectorIndexerModel] with VectorIndexerParams { + + // TODO: Check more carefully about whether this whole class will be included in a closure. + + private val transformFunc: Vector => Vector = { + val sortedCategoricalFeatureIndices = categoryMaps.keys.toArray.sorted + val localVectorMap = categoryMaps + val f: Vector => Vector = { + case dv: DenseVector => + val tmpv = dv.copy + localVectorMap.foreach { case (featureIndex: Int, categoryMap: Map[Double, Int]) => + tmpv.values(featureIndex) = categoryMap(tmpv(featureIndex)) + } + tmpv + case sv: SparseVector => + // We use the fact that categorical value 0 is always mapped to index 0. + val tmpv = sv.copy + var catFeatureIdx = 0 // index into sortedCategoricalFeatureIndices + var k = 0 // index into non-zero elements of sparse vector + while (catFeatureIdx < sortedCategoricalFeatureIndices.size && k < tmpv.indices.size) { + val featureIndex = sortedCategoricalFeatureIndices(catFeatureIdx) + if (featureIndex < tmpv.indices(k)) { + catFeatureIdx += 1 + } else if (featureIndex > tmpv.indices(k)) { + k += 1 + } else { + tmpv.values(k) = localVectorMap(featureIndex)(tmpv.values(k)) + catFeatureIdx += 1 + k += 1 + } + } + tmpv + } + f + } + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { + transformSchema(dataset.schema, paramMap, logging = true) + val map = this.paramMap ++ paramMap + val newCol = callUDF(transformFunc, new VectorUDT, dataset(map(inputCol))) + // For now, just check the first row of inputCol for vector length. + val firstRow = dataset.select(map(inputCol)).take(1) + if (firstRow.size != 0) { + val actualNumFeatures = firstRow(0).getAs[Vector](0).size + require(numFeatures == actualNumFeatures, "VectorIndexerModel expected vector of length" + + s" $numFeatures but found length $actualNumFeatures") + } + dataset.withColumn(map(outputCol), newCol) + } + + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + validateAndTransformSchema(schema, paramMap) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 17ece897a6c55..7d5178d0abb2d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -198,23 +198,31 @@ trait Params extends Identifiable with Serializable { /** * Check whether the given schema contains an input column. - * @param colName Parameter name for the input column. - * @param dataType SQL DataType of the input column. + * @param colName Input column name + * @param dataType Input column DataType */ protected def checkInputColumn(schema: StructType, colName: String, dataType: DataType): Unit = { val actualDataType = schema(colName).dataType - require(actualDataType.equals(dataType), - s"Input column $colName must be of type $dataType" + - s" but was actually $actualDataType. Column param description: ${getParam(colName)}") + require(actualDataType.equals(dataType), s"Input column $colName must be of type $dataType" + + s" but was actually $actualDataType. Column param description: ${getParam(colName)}") } + /** + * Add an output column to the given schema. + * This fails if the given output column already exists. + * @param schema Initial schema (not modified) + * @param colName Output column name. If this column name is an empy String "", this method + * returns the initial schema, unchanged. This allows users to disable output + * columns. + * @param dataType Output column DataType + */ protected def addOutputColumn( schema: StructType, colName: String, dataType: DataType): StructType = { if (colName.length == 0) return schema val fieldNames = schema.fieldNames - require(!fieldNames.contains(colName), s"Prediction column $colName already exists.") + require(!fieldNames.contains(colName), s"Output column $colName already exists.") val outputFields = schema.fields ++ Seq(StructField(colName, dataType, nullable = false)) StructType(outputFields) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala deleted file mode 100644 index 43edaebf53b75..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.feature - -import org.apache.spark.Logging -import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} -import org.apache.spark.rdd.RDD -import org.apache.spark.util.collection.OpenHashSet - -/** - * :: Experimental :: - * Class for indexing columns in a dataset. - * - * This helps process a dataset of unknown vectors into a dataset with some continuous features - * and some categorical features. The choice between continuous and categorical is based upon - * a maxCategories parameter. - * - * This can also map categorical feature values to 0-based indices. - * - * Usage: - * val myData1: RDD[Vector] = ... - * val myData2: RDD[Vector] = ... - * val datasetIndexer = new DatasetIndexer(maxCategories) - * datasetIndexer.fit(myData1) - * val indexedData1: RDD[Vector] = datasetIndexer.transform(myData1) - * datasetIndexer.fit(myData2) - * val indexedData2: RDD[Vector] = datasetIndexer.transform(myData2) - * val categoricalFeaturesInfo: Map[Int, Int] = datasetIndexer.getCategoricalFeaturesInfo() - * - * TODO: Add warning if a categorical feature has only 1 category. - * - * TODO: Add option for allowing unknown categories: - * Parameter allowUnknownCategories: - * If true, then handle unknown categories during `transform` - * by assigning them to an extra category index. - * That unknown category index should be index 1; this will allow maintaining sparsity - * (reserving index 0 for value 0.0), and it will allow category indices to remain fixed - * even if more categories are added later. - * - * @param maxCategories Threshold for the number of values a categorical feature can take. - * If a feature is found to have > maxCategories values, then it is - * declared continuous. - */ -@Experimental -class DatasetIndexer(val maxCategories: Int) extends Logging with Serializable { - - require(maxCategories > 1, - s"DatasetIndexer given maxCategories = $maxCategories, but requires maxCategories > 1.") - - private class FeatureValueStats(val numFeatures: Int, val maxCategories: Int) - extends Serializable { - - val featureValueSets = Array.fill[OpenHashSet[Double]](numFeatures)(new OpenHashSet[Double]()) - - /** - * Merge other [[FeatureValueStats]] into this instance, modifying this instance. - * @param other Other instance. Not modified. - * @return This instance - */ - def merge(other: FeatureValueStats): FeatureValueStats = { - featureValueSets.zip(other.featureValueSets).foreach { case (fvs1, fvs2) => - fvs2.iterator.foreach { val2 => - // Once we have found > maxCategories values, we know the feature is continuous - // and do not need to collect more values for it. - if (fvs1.size <= maxCategories) fvs1.add(val2) - } - } - this - } - - def addDenseVector(dv: DenseVector): Unit = { - var i = 0 - while (i < dv.size) { - if (featureValueSets(i).size <= maxCategories) { - featureValueSets(i).add(dv(i)) - } - i += 1 - } - } - - def addSparseVector(sv: SparseVector): Unit = { - // TODO: This could be made more efficient. - var vecIndex = 0 // index into vector - var nzIndex = 0 // index into non-zero elements - while (vecIndex < sv.size) { - val featureValue = if (nzIndex < sv.indices.size && vecIndex == sv.indices(nzIndex)) { - nzIndex += 1 - sv.values(nzIndex - 1) - } else { - 0.0 - } - if (featureValueSets(vecIndex).size <= maxCategories) { - featureValueSets(vecIndex).add(featureValue) - } - vecIndex += 1 - } - } - - } - - /** - * Array (over features) of sets of distinct feature values (up to maxCategories values). - * Null values in array indicate feature has been determined to be continuous. - * - * Once the number of elements in a feature's set reaches maxCategories + 1, - * then it is declared continuous, and we stop adding elements. - */ - private var featureValueStats: Option[FeatureValueStats] = None - - /** - * Scans a dataset once and updates statistics about each column. - * The statistics are used to choose categorical features and re-index them. - * - * Warning: Calling this on a new dataset changes the feature statistics and thus - * can change the behavior of [[transform]] and [[getCategoricalFeatureIndexes]]. - * It is best to [[fit]] on all datasets before calling [[transform]] on any. - * - * Note: To run this on an RDD[Double], convert to Vector via `data.map(Vectors.dense(_))`. - * - * @param data Dataset with equal-length vectors. - * NOTE: A single instance of [[DatasetIndexer]] must always be given vectors of - * the same length. If given non-matching vectors, this method will throw an error. - */ - def fit(data: RDD[Vector]): Unit = { - - // For each partition, get (featureValueStats, newNumFeatures). - // If all vectors have the same length, then newNumFeatures = None. - // If a vector with a new length is found, then newNumFeatures is set to that length. - val partitionFeatureValueSets: RDD[(Option[FeatureValueStats], Option[Int])] = - data.mapPartitions { iter => - // Make local copy of featureValueStats. - // This will be None initially if this is the first dataset to be fitted. - var localFeatureValueStats: Option[FeatureValueStats] = featureValueStats - var newNumFeatures: Option[Int] = None - // TODO: Track which features are known to be continuous already, and do not bother - // updating counts for them. Probably store featureValueStats in a linked list. - iter.foreach { - case dv: DenseVector => - localFeatureValueStats match { - case Some(fvs) => - if (fvs.numFeatures == dv.size) { - fvs.addDenseVector(dv) - } else { - // non-matching vector lengths - newNumFeatures = Some(dv.size) - } - case None => - localFeatureValueStats = Some(new FeatureValueStats(dv.size, maxCategories)) - localFeatureValueStats.get.addDenseVector(dv) - } - case sv: SparseVector => - localFeatureValueStats match { - case Some(fvs) => - if (fvs.numFeatures == sv.size) { - fvs.addSparseVector(sv) - } else { - // non-matching vector lengths - newNumFeatures = Some(sv.size) - } - case None => - localFeatureValueStats = Some(new FeatureValueStats(sv.size, maxCategories)) - localFeatureValueStats.get.addSparseVector(sv) - } - } - Iterator((localFeatureValueStats, newNumFeatures)) - } - val (aggFeatureValueStats: Option[FeatureValueStats], newNumFeatures: Option[Int]) = - partitionFeatureValueSets.fold((None, None)) { - case ((Some(fvs1), newNumFeatures1), (Some(fvs2), newNumFeatures2)) => - if (fvs2.numFeatures == fvs1.numFeatures) { - val tmpNumFeatures = (newNumFeatures1, newNumFeatures2) match { - case (None, None) => None // good: vector lengths match - case (None, _) => newNumFeatures2 - case (_, _) => newNumFeatures1 - } - (Some(fvs1.merge(fvs2)), tmpNumFeatures) - } else { - // non-matching vector lengths - (Some(fvs1), Some(fvs2.numFeatures)) - } - case ((Some(fvs1), newNumFeatures1), (None, None)) => - (Some(fvs1), newNumFeatures1) - case ((None, None), (Some(fvs2), newNumFeatures2)) => - (Some(fvs2), newNumFeatures2) - case ((None, None), (None, None)) => - (None, None) - } - if (newNumFeatures.nonEmpty) { - throw new RuntimeException("DatasetIndexer given records of non-matching length." + - s" Found records with length ${aggFeatureValueStats.get.numFeatures} and length" + - s" ${newNumFeatures.get}") - } - (featureValueStats, aggFeatureValueStats) match { - case (Some(origFVS), Some(newFVS)) => - origFVS.merge(newFVS) - case (None, Some(newFVS)) => - featureValueStats = Some(newFVS) - case _ => - logDebug("DatasetIndexer.fit(rdd) called on RDD with 0 rows.") - } - } - - /** - * Transforms the given dataset using the indexes returned by [[getCategoricalFeatureIndexes]]. - * Categorical features are mapped to their feature value indices. - * Continuous features (columns) are left unchanged. - * - * Note: To run this on an RDD[Double], convert to Vector via `data.map(Vectors.dense(_))`. - * - * @param data Dataset with equal-length vectors. - * NOTE: A single instance of [[DatasetIndexer]] must always be given vectors of - * the same length. If given non-matching vectors, this method will throw an error. - * @return Dataset with categorical features modified to use 0-based indices. - */ - def transform(data: RDD[Vector]): RDD[Vector] = { - val catFeatIdx = getCategoricalFeatureIndexes - data.mapPartitions { iterator => - val sortedCategoricalFeatureIndices = catFeatIdx.keys.toArray.sorted - iterator.map { v: Vector => - v match { - case dv: DenseVector => - catFeatIdx.foreach { case (featureIndex, categoryMap) => - dv.values(featureIndex) = categoryMap(dv(featureIndex)) - } - dv.asInstanceOf[Vector] - case sv: SparseVector => - var sortedFeatInd = 0 // index into sortedCategoricalFeatureIndices - var k = 0 // index into non-zero elements of sparse vector - while (sortedFeatInd < sortedCategoricalFeatureIndices.size && k < sv.indices.size) { - val featInd = sortedCategoricalFeatureIndices(sortedFeatInd) - if (featInd < sv.indices(k)) { - sortedFeatInd += 1 - } else if (featInd > sv.indices(k)) { - k += 1 - } else { - sv.values(k) = catFeatIdx(featInd)(sv.values(k)) - sortedFeatInd += 1 - k += 1 - } - } - sv.asInstanceOf[Vector] - } - } - } - } - - /** - * Based on datasets given to [[fit]], decide which features are categorical, - * and choose indices for categories. - * - * Sparsity: This tries to maintain sparsity by treating value 0.0 specially. - * If a categorical feature takes value 0.0, then value 0.0 is given index 0. - * - * @return Feature value index. Keys are categorical feature indices (column indices). - * Values are mappings from original features values to 0-based category indices. - */ - def getCategoricalFeatureIndexes: Map[Int, Map[Double, Int]] = featureValueStats match { - case Some(fvs) => - fvs.featureValueSets.zipWithIndex - .filter(_._1.size <= maxCategories).map { case (featureValues, featureIndex) => - // Get feature values, but remove 0 to treat separately. - // If value 0 exists, give it index 0 to maintain sparsity if possible. - var sortedFeatureValues = featureValues.iterator.filter(_ != 0.0).toArray.sorted - val zeroExists = sortedFeatureValues.size + 1 == featureValues.size - if (zeroExists) { - sortedFeatureValues = 0.0 +: sortedFeatureValues - } - val categoryMap: Map[Double, Int] = sortedFeatureValues.zipWithIndex.toMap - (featureIndex, categoryMap) - }.toMap - case None => - throw new RuntimeException("DatasetIndexer.getCategoricalFeatureIndexes called," + - " but no datasets have been indexed via fit() yet.") - } -} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureTests.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureTests.scala new file mode 100644 index 0000000000000..94a2eb5c209f5 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureTests.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.mllib.linalg.Vector + + +private[ml] object FeatureTests { + + case class DataSet(features: Vector) + +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala index a18c335952b96..cee65fe13a06e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala @@ -19,12 +19,12 @@ package org.apache.spark.ml.feature import org.scalatest.FunSuite +import org.apache.spark.ml.feature.FeatureTests.DataSet import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} -private case class DataSet(features: Vector) class NormalizerSuite extends FunSuite with MLlibTestSparkContext { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala new file mode 100644 index 0000000000000..644b45eb5e549 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.scalatest.FunSuite + +import org.apache.spark.SparkException +import org.apache.spark.ml.feature.FeatureTests.DataSet +import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SQLContext} + + +class VectorIndexerSuite extends FunSuite with MLlibTestSparkContext { + + @transient var sqlContext: SQLContext = _ + + // identical, of length 3 + @transient var densePoints1: DataFrame = _ + @transient var sparsePoints1: DataFrame = _ + + // identical, of length 2 + @transient var densePoints2: DataFrame = _ + @transient var sparsePoints2: DataFrame = _ + + // different lengths + @transient var badPoints: DataFrame = _ + + override def beforeAll(): Unit = { + super.beforeAll() + + val densePoints1Seq = Seq( + Vectors.dense(1.0, 2.0, 0.0), + Vectors.dense(0.0, 1.0, 2.0), + Vectors.dense(0.0, 0.0, -1.0), + Vectors.dense(1.0, 3.0, 2.0)) + val sparsePoints1Seq = Seq( + Vectors.sparse(3, Array(0, 1), Array(1.0, 2.0)), + Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)), + Vectors.sparse(3, Array(2), Array(-1.0)), + Vectors.sparse(3, Array(0, 1, 2), Array(1.0, 3.0, 2.0))) + + val densePoints2Seq = Seq( + Vectors.dense(1.0, 1.0, 0.0, 1.0), + Vectors.dense(0.0, 1.0, 1.0, 1.0), + Vectors.dense(-1.0, 1.0, 2.0, 0.0)) + val sparsePoints2Seq = Seq( + Vectors.sparse(4, Array(0, 1, 3), Array(1.0, 1.0, 1.0)), + Vectors.sparse(4, Array(1, 2, 3), Array(1.0, 1.0, 1.0)), + Vectors.sparse(4, Array(0, 1, 2), Array(-1.0, 1.0, 2.0))) + + val badPointsSeq = Seq( + Vectors.sparse(2, Array(0, 1), Array(1.0, 1.0)), + Vectors.sparse(3, Array(2), Array(-1.0))) + + // Sanity checks for assumptions made in tests + assert(densePoints1Seq(0).size == sparsePoints1Seq(0).size) + assert(densePoints2Seq(0).size == sparsePoints2Seq(0).size) + assert(densePoints1Seq(0).size != densePoints2Seq(0).size) + def checkPair(dvSeq: Seq[Vector], svSeq: Seq[Vector]): Unit = { + assert(dvSeq.zip(svSeq).forall { case (dv, sv) => dv.toArray === sv.toArray }, + "typo in unit test") + } + checkPair(densePoints1Seq, sparsePoints1Seq) + checkPair(densePoints2Seq, sparsePoints2Seq) + + sqlContext = new SQLContext(sc) + densePoints1 = sqlContext.createDataFrame(sc.parallelize(densePoints1Seq, 2).map(DataSet)) + sparsePoints1 = sqlContext.createDataFrame(sc.parallelize(sparsePoints1Seq, 2).map(DataSet)) + densePoints2 = sqlContext.createDataFrame(sc.parallelize(densePoints2Seq, 2).map(DataSet)) + sparsePoints2 = sqlContext.createDataFrame(sc.parallelize(sparsePoints2Seq, 2).map(DataSet)) + badPoints = sqlContext.createDataFrame(sc.parallelize(badPointsSeq, 2).map(DataSet)) + } + + private def getIndexer: VectorIndexer = + new VectorIndexer().setInputCol("features").setOutputCol("indexed") + + test("Cannot fit an empty DataFrame") { + val rdd = sqlContext.createDataFrame(sc.parallelize(Array.empty[Vector], 2).map(DataSet)) + val vectorIndexer = getIndexer + intercept[IllegalArgumentException] { + vectorIndexer.fit(rdd) + } + } + + test("Throws error when given RDDs with different size vectors") { + val vectorIndexer = getIndexer + val model = vectorIndexer.fit(densePoints1) // vectors of length 3 + model.transform(densePoints1) // should work + model.transform(sparsePoints1) // should work + intercept[IllegalArgumentException] { + model.transform(densePoints2) + println("Did not throw error when fit, transform were called on vectors of different lengths") + } + intercept[SparkException] { + vectorIndexer.fit(badPoints) + println("Did not throw error when fitting vectors of different lengths in same RDD.") + } + } + + test("Same result with dense and sparse vectors") { + def testDenseSparse(densePoints: DataFrame, sparsePoints: DataFrame): Unit = { + val denseVectorIndexer = getIndexer.setMaxCategories(2) + val sparseVectorIndexer = getIndexer.setMaxCategories(2) + val denseModel = denseVectorIndexer.fit(densePoints) + val sparseModel = sparseVectorIndexer.fit(sparsePoints) + val denseMap = denseModel.categoryMaps + val sparseMap = sparseModel.categoryMaps + assert(denseMap.keys.toSet == sparseMap.keys.toSet, + "Categorical features chosen from dense vs. sparse vectors did not match.") + assert(denseMap == sparseMap, + "Categorical feature value indexes chosen from dense vs. sparse vectors did not match.") + } + testDenseSparse(densePoints1, sparsePoints1) + testDenseSparse(densePoints2, sparsePoints2) + } + + test("Builds valid categorical feature value index, and transform correctly") { + def checkCategoryMaps( + data: DataFrame, + maxCategories: Int, + categoricalFeatures: Set[Int]): Unit = { + val collectedData = data.collect().map(_.getAs[Vector](0)) + try { + val vectorIndexer = getIndexer.setMaxCategories(maxCategories) + val model = vectorIndexer.fit(data) + val categoryMaps = model.categoryMaps + assert(categoryMaps.keys.toSet === categoricalFeatures) // Chose correct categorical features + val indexedRDD: RDD[Vector] = + model.transform(data).select("indexed").map(_.getAs[Vector](0)) + categoricalFeatures.foreach { catFeature: Int => + val origValueSet = collectedData.map(_(catFeature)).toSet + val targetValueIndexSet = Range(0, origValueSet.size).toSet + val catMap = categoryMaps(catFeature) + assert(catMap.keys.toSet === origValueSet) // Correct categories + assert(catMap.values.toSet === targetValueIndexSet) // Correct category indices + if (origValueSet.contains(0.0)) { + assert(catMap(0.0) === 0) // value 0 gets index 0 + } + // Check transformed data + assert(indexedRDD.map(_(catFeature)).collect().toSet === targetValueIndexSet) + } + } catch { + case e: org.scalatest.exceptions.TestFailedException => + println(s"checkCategoryMaps failed for input with maxCategories=$maxCategories," + + s" categoricalFeatures=${categoricalFeatures.mkString(", ")}") + throw e + } + } + checkCategoryMaps(densePoints1, maxCategories = 2, categoricalFeatures = Set(0)) + checkCategoryMaps(densePoints1, maxCategories = 3, categoricalFeatures = Set(0, 2)) + checkCategoryMaps(densePoints2, maxCategories = 2, categoricalFeatures = Set(1, 3)) + } + + test("Maintain sparsity for sparse vectors") { + def checkSparsity(data: DataFrame, maxCategories: Int): Unit = { + val points = data.collect().map(_.getAs[Vector](0)) + val vectorIndexer = getIndexer.setMaxCategories(maxCategories) + val model = vectorIndexer.fit(data) + val indexedPoints = model.transform(data).select("indexed").map(_.getAs[Vector](0)).collect() + points.zip(indexedPoints).foreach { + case (orig: SparseVector, indexed: SparseVector) => + assert(orig.indices.size == indexed.indices.size) + case _ => throw new UnknownError("Unit test has a bug in it.") // should never happen + } + } + checkSparsity(sparsePoints1, maxCategories = 2) + checkSparsity(sparsePoints2, maxCategories = 2) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala deleted file mode 100644 index c8479734bd855..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/DatasetIndexerSuite.scala +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.feature - -import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} -import org.apache.spark.mllib.util.LocalSparkContext -import org.apache.spark.rdd.RDD -import org.scalatest.FunSuite - -class DatasetIndexerSuite extends FunSuite with LocalSparkContext { - - test("Can fit an empty RDD") { - val rdd = sc.parallelize(Array.empty[Vector], 2) - val datasetIndexer = new DatasetIndexer(maxCategories = 10) - datasetIndexer.fit(rdd) - } - - test("If not fitted, throws error when transforming RDD or getting feature indexes") { - val points = Seq(Array(1.0, 2.0), Array(0.0, 1.0)) - val rdd = sc.parallelize(points.map(Vectors.dense), 2) - val datasetIndexer = new DatasetIndexer(maxCategories = 10) - intercept[RuntimeException] { - datasetIndexer.transform(rdd) - println("Did not throw error when transforming before fitting.") - } - intercept[RuntimeException] { - datasetIndexer.getCategoricalFeatureIndexes - println("Did not throw error when getting feature indexes before fitting.") - } - } - - test("Throws error when given RDDs with different size vectors") { - val points1 = Seq( - Array(1.0, 2.0), - Array(0.0, 1.0, 2.0), - Array(-1.0, 3.0)) - val rdd1 = sc.parallelize(points1.map(Vectors.dense), 2) - val points2a = Seq( - Array(1.0, 2.0), - Array(-1.0, 3.0)) - val rdd2a = sc.parallelize(points2a.map(Vectors.dense), 2) - val points2b = Seq( - Array(1.0), - Array(-1.0)) - val rdd2b = sc.parallelize(points2b.map(Vectors.dense), 2) - val rdd3 = sc.parallelize(Array.empty[Vector], 2) - - val datasetIndexer1 = new DatasetIndexer(maxCategories = 10) - intercept[RuntimeException] { - datasetIndexer1.fit(rdd1) - println("Did not throw error when fitting vectors of different lengths in same RDD.") - } - val datasetIndexer2 = new DatasetIndexer(maxCategories = 10) - datasetIndexer2.fit(rdd2a) - intercept[RuntimeException] { - datasetIndexer2.fit(rdd2b) - println("Did not throw error when fitting vectors of different lengths in two RDDs.") - } - val datasetIndexer3 = new DatasetIndexer(maxCategories = 10) - datasetIndexer3.fit(rdd3) // does nothing - datasetIndexer3.fit(rdd2a) // should work - } - - test("Same result with dense and sparse vectors") { - - def testDenseSparse(densePoints: Seq[Vector], sparsePoints: Seq[Vector]): Unit = { - assert(densePoints.zip(sparsePoints).forall { case (dv, sv) => dv.toArray === sv.toArray }, - s"typo in unit test") - val denseRDD = sc.parallelize(densePoints, 2) - val sparseRDD = sc.parallelize(sparsePoints, 2) - - val denseDatasetIndexer = new DatasetIndexer(maxCategories = 2) - val sparseDatasetIndexer = new DatasetIndexer(maxCategories = 2) - denseDatasetIndexer.fit(denseRDD) - sparseDatasetIndexer.fit(sparseRDD) - val denseFeatureIndexes = denseDatasetIndexer.getCategoricalFeatureIndexes - val sparseFeatureIndexes = sparseDatasetIndexer.getCategoricalFeatureIndexes - val categoricalFeatures = denseFeatureIndexes.keys.toSet - assert(categoricalFeatures == sparseFeatureIndexes.keys.toSet, - "Categorical features chosen from dense vs. sparse vectors did not match.") - - assert(denseFeatureIndexes == sparseFeatureIndexes, - "Categorical feature value indexes chosen from dense vs. sparse vectors did not match.") - } - - testDenseSparse(DatasetIndexerSuite.densePoints1, DatasetIndexerSuite.sparsePoints1) - testDenseSparse(DatasetIndexerSuite.densePoints2, DatasetIndexerSuite.sparsePoints2) - } - - test("Builds correct categorical feature value index, and transform correctly") { - def checkCategoricalFeatureIndex( - rdd: RDD[Vector], - maxCategories: Int, - categoricalFeatures: Set[Int]): Unit = { - val datasetIndexer = new DatasetIndexer(maxCategories = maxCategories) - datasetIndexer.fit(rdd) - val featureIndex = datasetIndexer.getCategoricalFeatureIndexes - assert(featureIndex.keys.toSet === categoricalFeatures) - val indexedRDD = datasetIndexer.transform(rdd) - categoricalFeatures.foreach { catFeature => - val origValueSet = rdd.collect().map(_(catFeature)).toSet - val targetValueIndexSet = Range(0, origValueSet.size).toSet - val valueIndex = featureIndex(catFeature) - assert(valueIndex.keys.toSet === origValueSet) - assert(valueIndex.values.toSet === targetValueIndexSet) - if (origValueSet.contains(0.0)) { - assert(valueIndex(0.0) === 0) // value 0 gets index 0 - } - // Check transformed data - assert(indexedRDD.map(_(catFeature)).collect().toSet === targetValueIndexSet) - } - } - - val rdd1 = sc.parallelize(DatasetIndexerSuite.densePoints1, 2) - checkCategoricalFeatureIndex(rdd1, maxCategories = 2, categoricalFeatures = Set(0)) - checkCategoricalFeatureIndex(rdd1, maxCategories = 3, categoricalFeatures = Set(0, 2)) - - val rdd2 = sc.parallelize(DatasetIndexerSuite.densePoints2, 2) - checkCategoricalFeatureIndex(rdd2, maxCategories = 2, categoricalFeatures = Set(1, 2)) - } - - test("Maintain sparsity for sparse vectors") { - def checkSparsity(points: Seq[Vector], maxCategories: Int): Unit = { - val rdd = sc.parallelize(points, 2) - val datasetIndexer = new DatasetIndexer(maxCategories = maxCategories) - datasetIndexer.fit(rdd) - val indexedPoints = datasetIndexer.transform(rdd).collect() - points.zip(indexedPoints).foreach { case (orig: SparseVector, indexed: SparseVector) => - assert(orig.indices.size == indexed.indices.size) - } - } - checkSparsity(DatasetIndexerSuite.sparsePoints1, maxCategories = 2) - checkSparsity(DatasetIndexerSuite.sparsePoints2, maxCategories = 2) - } -} - -object DatasetIndexerSuite { - private val densePoints1 = Seq( - Array(1.0, 2.0, 0.0), - Array(0.0, 1.0, 2.0), - Array(0.0, 0.0, -1.0), - Array(1.0, 3.0, 2.0)).map(Vectors.dense) - private val sparsePoints1 = Seq( - Vectors.sparse(3, Array(0, 1), Array(1.0, 2.0)), - Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)), - Vectors.sparse(3, Array(2), Array(-1.0)), - Vectors.sparse(3, Array(0, 1, 2), Array(1.0, 3.0, 2.0))) - - private val densePoints2 = Seq( - Array(1.0, 1.0, 0.0), - Array(0.0, 1.0, 0.0), - Array(-1.0, 1.0, 0.0)).map(Vectors.dense) - private val sparsePoints2 = Seq( - Vectors.sparse(3, Array(0, 1), Array(1.0, 1.0)), - Vectors.sparse(3, Array(1), Array(1.0)), - Vectors.sparse(3, Array(0, 1), Array(-1.0, 1.0))) -} From 02236c35623a2a8b95497757feb638e18671d961 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Fri, 10 Apr 2015 00:03:27 -0400 Subject: [PATCH 10/13] Updated VectorIndexer, ready for PR --- .../scala/org/apache/spark/ml/Pipeline.scala | 3 + .../spark/ml/attribute/AttributeGroup.scala | 21 ++- .../spark/ml/feature/VectorIndexer.scala | 127 +++++++++++++++--- .../ml/attribute/AttributeGroupSuite.scala | 8 +- .../spark/ml/feature/VectorIndexerSuite.scala | 87 ++++++++++-- .../apache/spark/ml/util/TestingUtils.scala | 57 ++++++++ 6 files changed, 259 insertions(+), 44 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index c4a36103303a2..a455341a1f723 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -47,6 +47,9 @@ abstract class PipelineStage extends Serializable with Logging { /** * Derives the output schema from the input schema and parameters, optionally with logging. + * + * This should be optimistic. If it is unclear whether the schema will be valid, then it should + * be assumed valid until proven otherwise. */ protected def transformSchema( schema: StructType, diff --git a/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala b/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala index 970e6ad5514d1..aa27a668f1695 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala @@ -106,7 +106,7 @@ class AttributeGroup private ( def getAttr(attrIndex: Int): Attribute = this(attrIndex) /** Converts to metadata without name. */ - private[attribute] def toMetadata: Metadata = { + private[attribute] def toMetadataImpl: Metadata = { import AttributeKeys._ val bldr = new MetadataBuilder() if (attributes.isDefined) { @@ -142,17 +142,24 @@ class AttributeGroup private ( bldr.build() } - /** Converts to a StructField with some existing metadata. */ - def toStructField(existingMetadata: Metadata): StructField = { - val newMetadata = new MetadataBuilder() + /** Converts to ML metadata with some existing metadata. */ + def toMetadata(existingMetadata: Metadata): Metadata = { + new MetadataBuilder() .withMetadata(existingMetadata) - .putMetadata(AttributeKeys.ML_ATTR, toMetadata) + .putMetadata(AttributeKeys.ML_ATTR, toMetadataImpl) .build() - StructField(name, new VectorUDT, nullable = false, newMetadata) + } + + /** Converts to ML metadata */ + def toMetadata: Metadata = toMetadata(Metadata.empty) + + /** Converts to a StructField with some existing metadata. */ + def toStructField(existingMetadata: Metadata): StructField = { + StructField(name, new VectorUDT, nullable = false, toMetadata(existingMetadata)) } /** Converts to a StructField. */ - def toStructField(): StructField = toStructField(Metadata.empty) + def toStructField: StructField = toStructField(Metadata.empty) override def equals(other: Any): Boolean = { other match { diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index 22abc0b4ba368..0832ef72bc6da 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -19,11 +19,13 @@ package org.apache.spark.ml.feature import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.attribute.{BinaryAttribute, NumericAttribute, NominalAttribute, + Attribute, AttributeGroup} import org.apache.spark.ml.param.{HasInputCol, HasOutputCol, IntParam, ParamMap, Params} import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, VectorUDT} import org.apache.spark.sql.{Row, DataFrame} import org.apache.spark.sql.functions.callUDF -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.collection.OpenHashSet @@ -43,16 +45,6 @@ private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOu /** @group getParam */ def getMaxCategories: Int = get(maxCategories) - - protected def validateAndTransformSchema(schema: StructType, paramMap: ParamMap): StructType = { - val map = this.paramMap ++ paramMap - val dataType = new VectorUDT - val className = this.getClass.getSimpleName - require(map.contains(inputCol), s"$className requires input column parameter: $inputCol") - require(map.contains(outputCol), s"$className requires output column parameter: $outputCol") - checkInputColumn(schema, map(inputCol), dataType) - addOutputColumn(schema, map(outputCol), dataType) - } } /** @@ -89,10 +81,16 @@ private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOu * That unknown category index should be index 1; this will allow maintaining sparsity * (reserving index 0 for value 0.0), and it will allow category indices to remain fixed * even if more categories are added later. + * + * TODO: Currently, this does not preserve input metadata in the output column. Before this class + * becomes non-experimental, we should preserve metadata for categorical features (but also + * check that the input data is valid for that metadata). */ @AlphaComponent class VectorIndexer extends Estimator[VectorIndexerModel] with VectorIndexerParams { + // TODO: If a feature is already marked as categorical in metadata, do not index it. + /** @group setParam */ def setMaxCategories(value: Int): this.type = { require(value > 1, @@ -110,7 +108,7 @@ class VectorIndexer extends Estimator[VectorIndexerModel] with VectorIndexerPara transformSchema(dataset.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap val firstRow = dataset.select(map(inputCol)).take(1) - require(firstRow.size == 1, s"VectorIndexer cannot be fit on an empty dataset.") + require(firstRow.length == 1, s"VectorIndexer cannot be fit on an empty dataset.") val numFeatures = firstRow(0).getAs[Vector](0).size val vectorDataset = dataset.select(map(inputCol)).map { case Row(v: Vector) => v } val maxCats = map(maxCategories) @@ -125,7 +123,14 @@ class VectorIndexer extends Estimator[VectorIndexerModel] with VectorIndexerPara } override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { - validateAndTransformSchema(schema, paramMap) + // We do not transfer feature metadata since we do not know what types of features we will + // produce in transform(). + val map = this.paramMap ++ paramMap + val dataType = new VectorUDT + require(map.contains(inputCol), s"VectorIndexer requires input column parameter: $inputCol") + require(map.contains(outputCol), s"VectorIndexer requires output column parameter: $outputCol") + checkInputColumn(schema, map(inputCol), dataType) + addOutputColumn(schema, map(outputCol), dataType) } } @@ -189,7 +194,7 @@ private object VectorIndexer { // Get feature values, but remove 0 to treat separately. // If value 0 exists, give it index 0 to maintain sparsity if possible. var sortedFeatureValues = featureValues.iterator.filter(_ != 0.0).toArray.sorted - val zeroExists = sortedFeatureValues.size + 1 == featureValues.size + val zeroExists = sortedFeatureValues.length + 1 == featureValues.size if (zeroExists) { sortedFeatureValues = 0.0 +: sortedFeatureValues } @@ -213,7 +218,7 @@ private object VectorIndexer { var vecIndex = 0 // index into vector var k = 0 // index into non-zero elements while (vecIndex < sv.size) { - val featureValue = if (k < sv.indices.size && vecIndex == sv.indices(k)) { + val featureValue = if (k < sv.indices.length && vecIndex == sv.indices(k)) { k += 1 sv.values(k - 1) } else { @@ -252,6 +257,37 @@ class VectorIndexerModel private[ml] ( val categoryMaps: Map[Int, Map[Double, Int]]) extends Model[VectorIndexerModel] with VectorIndexerParams { + /** + * Pre-computed feature attributes, with some missing info. + * In transform(), set attribute name and other info, if available. + */ + private val partialFeatureAttributes: Array[Attribute] = { + val attrs = new Array[Attribute](numFeatures) + var categoricalFeatureCount = 0 // validity check for numFeatures, categoryMaps + var featureIndex = 0 + while (featureIndex < numFeatures) { + if (categoryMaps.contains(featureIndex)) { + // categorical feature + val featureValues = categoryMaps(featureIndex).toArray.sortBy(_._1).map(_._1) + if (featureValues.length == 2) { + attrs(featureIndex) = new BinaryAttribute(index = Some(featureIndex), + values = Some(featureValues.map(_.toString))) + } else { + attrs(featureIndex) = new NominalAttribute(index = Some(featureIndex), + isOrdinal = Some(false), values = Some(featureValues.map(_.toString))) + } + categoricalFeatureCount += 1 + } else { + // continuous feature + attrs(featureIndex) = new NumericAttribute(index = Some(featureIndex)) + } + featureIndex += 1 + } + require(categoricalFeatureCount == categoryMaps.size, "VectorIndexerModel given categoryMaps" + + s" with keys outside expected range [0,...,numFeatures), where numFeatures=$numFeatures") + attrs + } + // TODO: Check more carefully about whether this whole class will be included in a closure. private val transformFunc: Vector => Vector = { @@ -268,8 +304,8 @@ class VectorIndexerModel private[ml] ( // We use the fact that categorical value 0 is always mapped to index 0. val tmpv = sv.copy var catFeatureIdx = 0 // index into sortedCategoricalFeatureIndices - var k = 0 // index into non-zero elements of sparse vector - while (catFeatureIdx < sortedCategoricalFeatureIndices.size && k < tmpv.indices.size) { + var k = 0 // index into non-zero elements of sparse vector + while (catFeatureIdx < sortedCategoricalFeatureIndices.length && k < tmpv.indices.length) { val featureIndex = sortedCategoricalFeatureIndices(catFeatureIdx) if (featureIndex < tmpv.indices(k)) { catFeatureIdx += 1 @@ -295,18 +331,69 @@ class VectorIndexerModel private[ml] ( override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { transformSchema(dataset.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap + val newField = prepOutputField(dataset.schema, map) val newCol = callUDF(transformFunc, new VectorUDT, dataset(map(inputCol))) // For now, just check the first row of inputCol for vector length. val firstRow = dataset.select(map(inputCol)).take(1) - if (firstRow.size != 0) { + if (firstRow.length != 0) { val actualNumFeatures = firstRow(0).getAs[Vector](0).size require(numFeatures == actualNumFeatures, "VectorIndexerModel expected vector of length" + s" $numFeatures but found length $actualNumFeatures") } - dataset.withColumn(map(outputCol), newCol) + dataset.withColumn(map(outputCol), newCol.as(map(outputCol), newField.metadata)) } override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { - validateAndTransformSchema(schema, paramMap) + val map = this.paramMap ++ paramMap + val dataType = new VectorUDT + require(map.contains(inputCol), + s"VectorIndexerModel requires input column parameter: $inputCol") + require(map.contains(outputCol), + s"VectorIndexerModel requires output column parameter: $outputCol") + checkInputColumn(schema, map(inputCol), dataType) + + val origAttrGroup = AttributeGroup.fromStructField(schema(map(inputCol))) + val origNumFeatures: Option[Int] = if (origAttrGroup.attributes.nonEmpty) { + Some(origAttrGroup.attributes.get.length) + } else { + origAttrGroup.numAttributes + } + require(origNumFeatures.forall(_ == numFeatures), "VectorIndexerModel expected" + + s" $numFeatures features, but input column ${map(inputCol)} had metadata specifying" + + s" ${origAttrGroup.numAttributes.get} features.") + + val newField = prepOutputField(schema, map) + val outputFields = schema.fields :+ newField + StructType(outputFields) + } + + // TODO: Figure out a way to avoid transforming the schema twice. + // It could be expensive if there are many features and few instances. + private def prepOutputField(schema: StructType, map: ParamMap): StructField = { + val origAttrGroup = AttributeGroup.fromStructField(schema(map(inputCol))) + val featureAttributes: Array[Attribute] = if (origAttrGroup.attributes.nonEmpty) { + // Convert original attributes to modified attributes + val origAttrs: Array[Attribute] = origAttrGroup.attributes.get + origAttrs.zip(partialFeatureAttributes).map { + case (origAttr: Attribute, featAttr: BinaryAttribute) => + if (origAttr.name.nonEmpty) { + featAttr.withName(origAttr.name.get) + } else { + featAttr + } + case (origAttr: Attribute, featAttr: NominalAttribute) => + if (origAttr.name.nonEmpty) { + featAttr.withName(origAttr.name.get) + } else { + featAttr + } + case (origAttr: Attribute, featAttr: NumericAttribute) => + origAttr.withIndex(featAttr.index.get) + } + } else { + partialFeatureAttributes + } + val newAttributeGroup = new AttributeGroup(map(outputCol), featureAttributes) + newAttributeGroup.toStructField(schema(map(inputCol)).metadata) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeGroupSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeGroupSuite.scala index 3fb6e2ec46468..0dcfe5a2002dc 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeGroupSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeGroupSuite.scala @@ -43,8 +43,8 @@ class AttributeGroupSuite extends FunSuite { intercept[NoSuchElementException] { group("abc") } - assert(group === AttributeGroup.fromMetadata(group.toMetadata, group.name)) - assert(group === AttributeGroup.fromStructField(group.toStructField())) + assert(group === AttributeGroup.fromMetadata(group.toMetadataImpl, group.name)) + assert(group === AttributeGroup.fromStructField(group.toStructField)) } test("attribute group without attributes") { @@ -53,8 +53,8 @@ class AttributeGroupSuite extends FunSuite { assert(group0.numAttributes === Some(10)) assert(group0.size === 10) assert(group0.attributes.isEmpty) - assert(group0 === AttributeGroup.fromMetadata(group0.toMetadata, group0.name)) - assert(group0 === AttributeGroup.fromStructField(group0.toStructField())) + assert(group0 === AttributeGroup.fromMetadata(group0.toMetadataImpl, group0.name)) + assert(group0 === AttributeGroup.fromStructField(group0.toStructField)) val group1 = new AttributeGroup("item") assert(group1.name === "item") diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala index 644b45eb5e549..6c46a5ccc9706 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala @@ -20,7 +20,9 @@ package org.apache.spark.ml.feature import org.scalatest.FunSuite import org.apache.spark.SparkException +import org.apache.spark.ml.attribute._ import org.apache.spark.ml.feature.FeatureTests.DataSet +import org.apache.spark.ml.util.TestingUtils import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD @@ -34,6 +36,7 @@ class VectorIndexerSuite extends FunSuite with MLlibTestSparkContext { // identical, of length 3 @transient var densePoints1: DataFrame = _ @transient var sparsePoints1: DataFrame = _ + @transient var point1maxes: Array[Double] = _ // identical, of length 2 @transient var densePoints2: DataFrame = _ @@ -55,6 +58,7 @@ class VectorIndexerSuite extends FunSuite with MLlibTestSparkContext { Vectors.sparse(3, Array(1, 2), Array(1.0, 2.0)), Vectors.sparse(3, Array(2), Array(-1.0)), Vectors.sparse(3, Array(0, 1, 2), Array(1.0, 3.0, 2.0))) + point1maxes = Array(1.0, 3.0, 2.0) val densePoints2Seq = Seq( Vectors.dense(1.0, 1.0, 0.0, 1.0), @@ -70,9 +74,9 @@ class VectorIndexerSuite extends FunSuite with MLlibTestSparkContext { Vectors.sparse(3, Array(2), Array(-1.0))) // Sanity checks for assumptions made in tests - assert(densePoints1Seq(0).size == sparsePoints1Seq(0).size) - assert(densePoints2Seq(0).size == sparsePoints2Seq(0).size) - assert(densePoints1Seq(0).size != densePoints2Seq(0).size) + assert(densePoints1Seq.head.size == sparsePoints1Seq.head.size) + assert(densePoints2Seq.head.size == sparsePoints2Seq.head.size) + assert(densePoints1Seq.head.size != densePoints2Seq.head.size) def checkPair(dvSeq: Seq[Vector], svSeq: Seq[Vector]): Unit = { assert(dvSeq.zip(svSeq).forall { case (dv, sv) => dv.toArray === sv.toArray }, "typo in unit test") @@ -131,35 +135,64 @@ class VectorIndexerSuite extends FunSuite with MLlibTestSparkContext { testDenseSparse(densePoints2, sparsePoints2) } - test("Builds valid categorical feature value index, and transform correctly") { + test("Builds valid categorical feature value index, transform correctly, check metadata") { def checkCategoryMaps( data: DataFrame, maxCategories: Int, categoricalFeatures: Set[Int]): Unit = { val collectedData = data.collect().map(_.getAs[Vector](0)) + val errMsg = s"checkCategoryMaps failed for input with maxCategories=$maxCategories," + + s" categoricalFeatures=${categoricalFeatures.mkString(", ")}" try { val vectorIndexer = getIndexer.setMaxCategories(maxCategories) val model = vectorIndexer.fit(data) val categoryMaps = model.categoryMaps assert(categoryMaps.keys.toSet === categoricalFeatures) // Chose correct categorical features - val indexedRDD: RDD[Vector] = - model.transform(data).select("indexed").map(_.getAs[Vector](0)) - categoricalFeatures.foreach { catFeature: Int => - val origValueSet = collectedData.map(_(catFeature)).toSet + val transformed = model.transform(data).select("indexed") + val indexedRDD: RDD[Vector] = transformed.map(_.getAs[Vector](0)) + val featureAttrs = AttributeGroup.fromStructField(transformed.schema("indexed")) + assert(featureAttrs.name === "indexed") + assert(featureAttrs.attributes.get.length === model.numFeatures) + categoricalFeatures.foreach { feature: Int => + val origValueSet = collectedData.map(_(feature)).toSet val targetValueIndexSet = Range(0, origValueSet.size).toSet - val catMap = categoryMaps(catFeature) + val catMap = categoryMaps(feature) assert(catMap.keys.toSet === origValueSet) // Correct categories assert(catMap.values.toSet === targetValueIndexSet) // Correct category indices if (origValueSet.contains(0.0)) { assert(catMap(0.0) === 0) // value 0 gets index 0 } // Check transformed data - assert(indexedRDD.map(_(catFeature)).collect().toSet === targetValueIndexSet) + assert(indexedRDD.map(_(feature)).collect().toSet === targetValueIndexSet) + // Check metadata + val featureAttr = featureAttrs(feature) + assert(featureAttr.index.get === feature) + featureAttr match { + case attr: BinaryAttribute => + assert(attr.values.get === origValueSet.toArray.sorted.map(_.toString)) + case attr: NominalAttribute => + assert(attr.values.get === origValueSet.toArray.sorted.map(_.toString)) + assert(attr.isOrdinal.get === false) + case _ => + throw new RuntimeException(errMsg + s". Categorical feature $feature failed" + + s" metadata check. Found feature attribute: $featureAttr.") + } + } + // Check numerical feature metadata. + Range(0, model.numFeatures).filter(feature => !categoricalFeatures.contains(feature)) + .foreach { feature: Int => + val featureAttr = featureAttrs(feature) + featureAttr match { + case attr: NumericAttribute => + assert(featureAttr.index.get === feature) + case _ => + throw new RuntimeException(errMsg + s". Numerical feature $feature failed" + + s" metadata check. Found feature attribute: $featureAttr.") + } } } catch { case e: org.scalatest.exceptions.TestFailedException => - println(s"checkCategoryMaps failed for input with maxCategories=$maxCategories," + - s" categoricalFeatures=${categoricalFeatures.mkString(", ")}") + println(errMsg) throw e } } @@ -176,11 +209,39 @@ class VectorIndexerSuite extends FunSuite with MLlibTestSparkContext { val indexedPoints = model.transform(data).select("indexed").map(_.getAs[Vector](0)).collect() points.zip(indexedPoints).foreach { case (orig: SparseVector, indexed: SparseVector) => - assert(orig.indices.size == indexed.indices.size) + assert(orig.indices.length == indexed.indices.length) case _ => throw new UnknownError("Unit test has a bug in it.") // should never happen } } checkSparsity(sparsePoints1, maxCategories = 2) checkSparsity(sparsePoints2, maxCategories = 2) } + + test("Preserve metadata") { + // For continuous features, preserve name and stats. + val featureAttributes: Array[Attribute] = point1maxes.zipWithIndex.map { case (maxVal, i) => + NumericAttribute.defaultAttr.withName(i.toString).withMax(maxVal) + } + val attrGroup = new AttributeGroup("features", featureAttributes) + val densePoints1WithMeta = + densePoints1.select(densePoints1("features").as("features", attrGroup.toMetadata)) + val vectorIndexer = getIndexer.setMaxCategories(2) + val model = vectorIndexer.fit(densePoints1WithMeta) + // Check that ML metadata are preserved. + val indexedPoints = model.transform(densePoints1WithMeta) + val transAttributes: Array[Attribute] = + AttributeGroup.fromStructField(indexedPoints.schema("indexed")).attributes.get + featureAttributes.zip(transAttributes).foreach { case (orig, trans) => + assert(orig.name === trans.name) + (orig, trans) match { + case (orig: NumericAttribute, trans: NumericAttribute) => + assert(orig.max.nonEmpty && orig.max === trans.max) + case _ => + // do nothing + // TODO: Once input features marked as categorical are handled correctly, check that here. + } + } + // Check that non-ML metadata are preserved. + TestingUtils.testPreserveMetadata(densePoints1WithMeta, model, "features", "indexed") + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala b/mllib/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala new file mode 100644 index 0000000000000..126a32f1b4d1a --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.util + +import org.apache.spark.ml.Transformer +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.MetadataBuilder +import org.scalatest.FunSuite + +private[ml] object TestingUtils extends FunSuite { + + /** + * Test whether unrelated metadata are preserved for this transformer. + * This attaches extra metadata to a column, transforms the column, and check to ensure the + * extra metadata have not changed. + * @param data Input dataset + * @param transformer Transformer to test + * @param inputCol Unique input column for Transformer. This must be the ONLY input column. + * @param outputCol Output column to test for metadata presence. + */ + def testPreserveMetadata( + data: DataFrame, + transformer: Transformer, + inputCol: String, + outputCol: String): Unit = { + val origMetadata = data.schema(inputCol).metadata + val metaKey = "__testPreserveMetadata__fake_key" + val metaValue = 12345 + assert(!origMetadata.contains(metaKey), + s"Unit test with testPreserveMetadata will fail since metadata key was present: $metaKey") + val newMetadata = + new MetadataBuilder().withMetadata(origMetadata).putLong(metaKey, metaValue).build() + val withMetadata = data.select(data(inputCol).as(inputCol, newMetadata)) + val transformed = transformer.transform(withMetadata) + val transMetadata = transformed.schema(outputCol).metadata + assert(transMetadata.contains(metaKey), + "Unit test with testPreserveMetadata failed; extra metadata key was not present.") + assert(transMetadata.getLong(metaKey) === metaValue, + "Unit test with testPreserveMetadata failed; extra metadata value was wrong." + + s" Expected $metaValue but found ${transMetadata.getLong(metaKey)}") + } +} From 643b4449e1b1af025cf2dcd00c2fd90f9cbe4c29 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Fri, 10 Apr 2015 00:13:50 -0400 Subject: [PATCH 11/13] removed FeatureTests --- .../spark/ml/feature/FeatureTests.scala | 27 ------------------- .../spark/ml/feature/NormalizerSuite.scala | 7 +++-- .../spark/ml/feature/VectorIndexerSuite.scala | 19 ++++++++----- 3 files changed, 17 insertions(+), 36 deletions(-) delete mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/FeatureTests.scala diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureTests.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureTests.scala deleted file mode 100644 index 94a2eb5c209f5..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureTests.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.feature - -import org.apache.spark.mllib.linalg.Vector - - -private[ml] object FeatureTests { - - case class DataSet(features: Vector) - -} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala index cee65fe13a06e..9d09f24709e23 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.ml.feature import org.scalatest.FunSuite -import org.apache.spark.ml.feature.FeatureTests.DataSet import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -63,7 +62,7 @@ class NormalizerSuite extends FunSuite with MLlibTestSparkContext { ) val sqlContext = new SQLContext(sc) - dataFrame = sqlContext.createDataFrame(sc.parallelize(data, 2).map(DataSet)) + dataFrame = sqlContext.createDataFrame(sc.parallelize(data, 2).map(NormalizerSuite.FeatureData)) normalizer = new Normalizer() .setInputCol("features") .setOutputCol("normalized_features") @@ -107,3 +106,7 @@ class NormalizerSuite extends FunSuite with MLlibTestSparkContext { assertValues(result, l1Normalized) } } + +private object NormalizerSuite { + case class FeatureData(features: Vector) +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala index 6c46a5ccc9706..27fbe205ccd32 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala @@ -21,7 +21,6 @@ import org.scalatest.FunSuite import org.apache.spark.SparkException import org.apache.spark.ml.attribute._ -import org.apache.spark.ml.feature.FeatureTests.DataSet import org.apache.spark.ml.util.TestingUtils import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -31,6 +30,8 @@ import org.apache.spark.sql.{DataFrame, SQLContext} class VectorIndexerSuite extends FunSuite with MLlibTestSparkContext { + import VectorIndexerSuite.FeatureData + @transient var sqlContext: SQLContext = _ // identical, of length 3 @@ -85,18 +86,18 @@ class VectorIndexerSuite extends FunSuite with MLlibTestSparkContext { checkPair(densePoints2Seq, sparsePoints2Seq) sqlContext = new SQLContext(sc) - densePoints1 = sqlContext.createDataFrame(sc.parallelize(densePoints1Seq, 2).map(DataSet)) - sparsePoints1 = sqlContext.createDataFrame(sc.parallelize(sparsePoints1Seq, 2).map(DataSet)) - densePoints2 = sqlContext.createDataFrame(sc.parallelize(densePoints2Seq, 2).map(DataSet)) - sparsePoints2 = sqlContext.createDataFrame(sc.parallelize(sparsePoints2Seq, 2).map(DataSet)) - badPoints = sqlContext.createDataFrame(sc.parallelize(badPointsSeq, 2).map(DataSet)) + densePoints1 = sqlContext.createDataFrame(sc.parallelize(densePoints1Seq, 2).map(FeatureData)) + sparsePoints1 = sqlContext.createDataFrame(sc.parallelize(sparsePoints1Seq, 2).map(FeatureData)) + densePoints2 = sqlContext.createDataFrame(sc.parallelize(densePoints2Seq, 2).map(FeatureData)) + sparsePoints2 = sqlContext.createDataFrame(sc.parallelize(sparsePoints2Seq, 2).map(FeatureData)) + badPoints = sqlContext.createDataFrame(sc.parallelize(badPointsSeq, 2).map(FeatureData)) } private def getIndexer: VectorIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexed") test("Cannot fit an empty DataFrame") { - val rdd = sqlContext.createDataFrame(sc.parallelize(Array.empty[Vector], 2).map(DataSet)) + val rdd = sqlContext.createDataFrame(sc.parallelize(Array.empty[Vector], 2).map(FeatureData)) val vectorIndexer = getIndexer intercept[IllegalArgumentException] { vectorIndexer.fit(rdd) @@ -245,3 +246,7 @@ class VectorIndexerSuite extends FunSuite with MLlibTestSparkContext { TestingUtils.testPreserveMetadata(densePoints1WithMeta, model, "features", "indexed") } } + +private object VectorIndexerSuite { + case class FeatureData(features: Vector) +} From f5c57a80bb4a81466220974e2b2a6676d5e85459 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Sat, 11 Apr 2015 10:49:54 -0400 Subject: [PATCH 12/13] added Java test suite --- .../spark/ml/feature/VectorIndexer.scala | 25 +++---- .../ml/feature/JavaVectorIndexerSuite.java | 70 +++++++++++++++++++ .../spark/ml/feature/VectorIndexerSuite.scala | 7 +- 3 files changed, 85 insertions(+), 17 deletions(-) create mode 100644 mllib/src/test/java/org/apache/spark/ml/feature/JavaVectorIndexerSuite.java diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index 0832ef72bc6da..050667820fedd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -58,11 +58,16 @@ private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOu * features and some categorical features. The choice between continuous and categorical * is based upon a maxCategories parameter. * - Set maxCategories to the maximum number of categorical any categorical feature should have. + * - E.g.: Feature 0 has unique values {-1.0, 0.0}, and feature 1 values {1.0, 3.0, 5.0}. + * If maxCategories = 2, then feature 0 will be declared categorical and use indices {0, 1}, + * and feature 1 will be declared continuous. * - Index all features, if all features are categorical * - If maxCategories is set to be very large, then this will build an index of unique * values for all features. * - Warning: This can cause problems if features are continuous since this will collect ALL * unique values to the driver. + * - E.g.: Feature 0 has unique values {-1.0, 0.0}, and feature 1 values {1.0, 3.0, 5.0}. + * If maxCategories >= 3, then both features will be declared categorical. * * This returns a model which can transform categorical features to use 0-based indices. * @@ -72,25 +77,15 @@ private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOu * This maintains vector sparsity. * - More stability may be added in the future. * - * TODO: Add warning if a categorical feature has only 1 category? - * - * TODO: Add option for allowing unknown categories: - * Parameter allowUnknownCategories: - * If true, then handle unknown categories during `transform` - * by assigning them to an extra category index. - * That unknown category index should be index 1; this will allow maintaining sparsity - * (reserving index 0 for value 0.0), and it will allow category indices to remain fixed - * even if more categories are added later. - * - * TODO: Currently, this does not preserve input metadata in the output column. Before this class - * becomes non-experimental, we should preserve metadata for categorical features (but also - * check that the input data is valid for that metadata). + * TODO: Future extensions: The following functionality is planned for the future: + * - Preserve metadata in transform; if a feature's metadata is already present, do not recompute. + * - Specify certain features to not index, either via a parameter or via existing metadata. + * - Add warning if a categorical feature has only 1 category. + * - Add option for allowing unknown categories. */ @AlphaComponent class VectorIndexer extends Estimator[VectorIndexerModel] with VectorIndexerParams { - // TODO: If a feature is already marked as categorical in metadata, do not index it. - /** @group setParam */ def setMaxCategories(value: Int): this.type = { require(value > 1, diff --git a/mllib/src/test/java/org/apache/spark/ml/feature/JavaVectorIndexerSuite.java b/mllib/src/test/java/org/apache/spark/ml/feature/JavaVectorIndexerSuite.java new file mode 100644 index 0000000000000..161100134c92d --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/ml/feature/JavaVectorIndexerSuite.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature; + +import java.io.Serializable; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.ml.feature.VectorIndexerSuite.FeatureData; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; + + +public class JavaVectorIndexerSuite implements Serializable { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaVectorIndexerSuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + } + + @Test + public void vectorIndexerAPI() { + // The tests are to check Java compatibility. + List points = Lists.newArrayList( + new FeatureData(Vectors.dense(0.0, -2.0)), + new FeatureData(Vectors.dense(1.0, 3.0)), + new FeatureData(Vectors.dense(1.0, 4.0)) + ); + SQLContext sqlContext = new SQLContext(sc); + DataFrame data = sqlContext.createDataFrame(sc.parallelize(points, 2), FeatureData.class); + VectorIndexer indexer = new VectorIndexer() + .setInputCol("features") + .setOutputCol("indexed") + .setMaxCategories(2); + VectorIndexerModel model = indexer.fit(data); + Assert.assertEquals(model.numFeatures(), 2); + Assert.assertEquals(model.categoryMaps().size(), 1); + DataFrame indexedData = model.transform(data); + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala index 27fbe205ccd32..61c46c85a78b5 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.feature +import scala.beans.{BeanInfo, BeanProperty} + import org.scalatest.FunSuite import org.apache.spark.SparkException @@ -247,6 +249,7 @@ class VectorIndexerSuite extends FunSuite with MLlibTestSparkContext { } } -private object VectorIndexerSuite { - case class FeatureData(features: Vector) +private[feature] object VectorIndexerSuite { + @BeanInfo + case class FeatureData(@BeanProperty features: Vector) } From 5956d9197de833bfee870dadd51bbed7ec136ea1 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Sat, 11 Apr 2015 11:44:38 -0400 Subject: [PATCH 13/13] minor cleanups --- .../spark/ml/feature/VectorIndexer.scala | 31 +++++++++---------- .../apache/spark/ml/util/TestingUtils.scala | 3 ++ 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index 050667820fedd..8760960e19272 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -146,11 +146,7 @@ private object VectorIndexer { private val featureValueSets = Array.fill[OpenHashSet[Double]](numFeatures)(new OpenHashSet[Double]()) - /** - * Merge with another instance, modifying this instance. - * @param other Other instance, not modified - * @return This instance, modified - */ + /** Merge with another instance, modifying this instance. */ def merge(other: CategoryStats): CategoryStats = { featureValueSets.zip(other.featureValueSets).foreach { case (thisValSet, otherValSet) => otherValSet.iterator.foreach { x => @@ -186,8 +182,6 @@ private object VectorIndexer { // Filter out features which are declared continuous. featureValueSets.zipWithIndex.filter(_._1.size <= maxCategories).map { case (featureValues: OpenHashSet[Double], featureIndex: Int) => - // Get feature values, but remove 0 to treat separately. - // If value 0 exists, give it index 0 to maintain sparsity if possible. var sortedFeatureValues = featureValues.iterator.filter(_ != 0.0).toArray.sorted val zeroExists = sortedFeatureValues.length + 1 == featureValues.size if (zeroExists) { @@ -232,17 +226,17 @@ private object VectorIndexer { * :: AlphaComponent :: * * Transform categorical features to use 0-based indices instead of their original values. - * - Categorical features are mapped to their feature value indices. + * - Categorical features are mapped to indices. * - Continuous features (columns) are left unchanged. + * This also appends metadata to the output column, marking features as Numeric (continuous), + * Nominal (categorical), or Binary (either continuous or categorical). * * This maintains vector sparsity. * - * Note: If this model was created for vectors of length numFeatures, - * this model's transform method must be given vectors of length numFeatures. - * * @param numFeatures Number of features, i.e., length of Vectors which this transforms * @param categoryMaps Feature value index. Keys are categorical feature indices (column indices). * Values are maps from original features values to 0-based category indices. + * If a feature is not in this map, it is treated as continuous. */ @AlphaComponent class VectorIndexerModel private[ml] ( @@ -263,13 +257,14 @@ class VectorIndexerModel private[ml] ( while (featureIndex < numFeatures) { if (categoryMaps.contains(featureIndex)) { // categorical feature - val featureValues = categoryMaps(featureIndex).toArray.sortBy(_._1).map(_._1) + val featureValues: Array[String] = + categoryMaps(featureIndex).toArray.sortBy(_._1).map(_._1).map(_.toString) if (featureValues.length == 2) { attrs(featureIndex) = new BinaryAttribute(index = Some(featureIndex), - values = Some(featureValues.map(_.toString))) + values = Some(featureValues)) } else { attrs(featureIndex) = new NominalAttribute(index = Some(featureIndex), - isOrdinal = Some(false), values = Some(featureValues.map(_.toString))) + isOrdinal = Some(false), values = Some(featureValues)) } categoricalFeatureCount += 1 } else { @@ -362,8 +357,12 @@ class VectorIndexerModel private[ml] ( StructType(outputFields) } - // TODO: Figure out a way to avoid transforming the schema twice. - // It could be expensive if there are many features and few instances. + /** + * Prepare the output column field, including per-feature metadata. + * @param schema Input schema + * @param map Parameter map (with this class' embedded parameter map folded in) + * @return Output column field + */ private def prepOutputField(schema: StructType, map: ParamMap): StructField = { val origAttrGroup = AttributeGroup.fromStructField(schema(map(inputCol))) val featureAttributes: Array[Attribute] = if (origAttrGroup.attributes.nonEmpty) { diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala b/mllib/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala index 126a32f1b4d1a..c44cb61b34171 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/util/TestingUtils.scala @@ -38,6 +38,7 @@ private[ml] object TestingUtils extends FunSuite { transformer: Transformer, inputCol: String, outputCol: String): Unit = { + // Create some fake metadata val origMetadata = data.schema(inputCol).metadata val metaKey = "__testPreserveMetadata__fake_key" val metaValue = 12345 @@ -45,7 +46,9 @@ private[ml] object TestingUtils extends FunSuite { s"Unit test with testPreserveMetadata will fail since metadata key was present: $metaKey") val newMetadata = new MetadataBuilder().withMetadata(origMetadata).putLong(metaKey, metaValue).build() + // Add metadata to the inputCol val withMetadata = data.select(data(inputCol).as(inputCol, newMetadata)) + // Transform, and ensure extra metadata was not affected val transformed = transformer.transform(withMetadata) val transMetadata = transformed.schema(outputCol).metadata assert(transMetadata.contains(metaKey),