From 78249c235fc4fdc043d4a548ab3c8ede3a5466c8 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 10 Mar 2014 18:43:49 -0700 Subject: [PATCH 1/3] SPARK-1216. Add a OneHotEncoder for handling categorical features --- .../mllib/preprocessing/OneHotEncoder.scala | 112 ++++++++++++++++++ .../preprocessing/OneHotEncoderSuite.scala | 60 ++++++++++ 2 files changed, 172 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/preprocessing/OneHotEncoder.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/preprocessing/OneHotEncoder.scala b/mllib/src/main/scala/org/apache/spark/mllib/preprocessing/OneHotEncoder.scala new file mode 100644 index 0000000000000..1011bcb765866 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/preprocessing/OneHotEncoder.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.preprocessing + +import org.apache.spark.rdd.RDD + +import scala.collection.mutable.HashSet +import scala.collection.mutable.Set + +/** + * A utility for encoding categorical variables as numeric variables. The resulting vectors + * contain a component for each value that the variable can take. The component corresponding + * to the value that the variable takes is set to 1 and the components corresponding to all other + * categories are set to 0. + * + * The utility handles input vectors with mixed categorical and numeric variables by accepting a + * list of feature indices that are categorical and only transforming those. + * + * An example usage is: + * + * {{{ + * val categoricalFields = Array(0, 7, 21) + * val categories = OneHotEncoder.categories(rdd, categoricalFields) + * val encoded = OneHotEncoder.encode(rdd, categories) + * }}} + */ +object OneHotEncoder { + + /** + * Given a dataset and the set of columns which are categorical variables, returns a structure + * that, for each field, describes the values that are present for in the dataset. The structure + * is meant to be used as input to encode. + */ + def categories(rdd: RDD[Array[Any]], categoricalFields: Seq[Int]): Array[Map[Any, Int]] = { + val categories = rdd.map(categoricals(_, categoricalFields)).reduce(uniqueCats) + + val catMaps = new Array[Map[Any, Int]](rdd.first().length) + for (i <- 0 until categoricalFields.length) { + catMaps(categoricalFields(i)) = categories(i).zipWithIndex.toMap + } + + catMaps + } + + /** + * Accepts a vector and set of feature indices that are categorical variables. Outputs an array + * whose size is the number of categorical fields and each element is a Set of size one + * containing a categorical value from the input vec + */ + private def categoricals(tokens: Array[Any], catFields: Seq[Int]): Array[Set[Any]] = { + val categoriesArr = new Array[Set[Any]](catFields.length) + for (i <- 0 until catFields.length) { + categoriesArr(i) = new HashSet[Any]() + categoriesArr(i) += tokens(catFields(i)) + } + categoriesArr + } + + private def uniqueCats(a: Array[Set[Any]], b: Array[Set[Any]]): Array[Set[Any]] = { + for (i <- 0 until a.length) { + a(i) ++= b(i) + } + a + } + + /** + * OneHot encodes the given RDD. + */ + def encode(rdd: RDD[Array[Any]], featureCategories: Array[Map[Any, Int]]): + RDD[Array[Any]] = { + var outArrLen = 0 + for (catMap <- featureCategories) { + outArrLen += (if (catMap == null) 1 else catMap.size) + } + rdd.map(encodeVec(_, featureCategories, outArrLen)) + } + + private def encodeVec(vec: Array[Any], featureCategories: Array[Map[Any, Int]], + outArrLen: Int): Array[Any] = { + var outArrIndex = 0 + val outVec = new Array[Any](outArrLen) + for (i <- 0 until vec.length) { + if (featureCategories(i) != null) { + for (j <- outArrIndex until outArrIndex + featureCategories(i).size) { + outVec(j) = 0 + } + outVec(outArrIndex + featureCategories(i).getOrElse(vec(i), -1)) = 1 + outArrIndex += featureCategories(i).size + } else { + outVec(outArrIndex) = vec(i) + outArrIndex += 1 + } + } + outVec + } + +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala new file mode 100644 index 0000000000000..4dee5167de808 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.preprocessing + +import org.apache.spark.{SparkContext, SparkConf} + +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers + +class OneHotEncoderSuite extends FunSuite with ShouldMatchers { + + test("one hot encoder") { + val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Test OneHotEncoder")) + val vecs = Array( + Array("marcy playground", 1.3, "apple", 2), + Array("pearl jam", 3.5, "banana", 4), + Array("nirvana", 6.7, "apple", 3) + ) + val categoricalFields = Array(0, 2) + val rdd = sc.parallelize(vecs, 1) + + val catMap = OneHotEncoder.categories(rdd, categoricalFields) + val encoded = OneHotEncoder.encode(rdd, catMap) + + val result = encoded.collect() + result.size should be (vecs.size) + + val vec1 = Array[Any](0, 0, 0, 1.3, 0, 0, 2) + vec1(catMap(0).getOrElse("marcy playground", -1)) = 1 + vec1(4 + catMap(2).getOrElse("apple", -1)) = 1 + + val vec2 = Array[Any](0, 0, 0, 3.5, 0, 0, 4) + vec2(catMap(0).getOrElse("pearl jam", -1)) = 1 + vec2(4 + catMap(2).getOrElse("banana", -1)) = 1 + + val vec3 = Array[Any](0, 0, 0, 6.7, 0, 0, 3) + vec3(catMap(0).getOrElse("nirvana", -1)) = 1 + vec3(4 + catMap(2).getOrElse("apple", -1)) = 1 + + result(0) should equal (vec1) + result(1) should equal (vec2) + result(2) should equal (vec3) + } +} + From ad171297db7320d4e0f32dbaeea31c945c938a2f Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 7 Apr 2014 11:27:51 -0700 Subject: [PATCH 2/3] Use LocalSparkContext --- .../apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala index 4dee5167de808..a121f4deee16b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala @@ -18,14 +18,14 @@ package org.apache.spark.mllib.preprocessing import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.mllib.util.LocalSparkContext import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers -class OneHotEncoderSuite extends FunSuite with ShouldMatchers { +class OneHotEncoderSuite extends FunSuite with LocalSparkContext with ShouldMatchers { test("one hot encoder") { - val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Test OneHotEncoder")) val vecs = Array( Array("marcy playground", 1.3, "apple", 2), Array("pearl jam", 3.5, "banana", 4), From 8e0efba08ab88306ac66706b7ac3b4f84f93d134 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Sat, 12 Apr 2014 10:16:10 -0700 Subject: [PATCH 3/3] Incorporate feedback --- .../mllib/preprocessing/OneHotEncoder.scala | 103 +++++++++++------- .../preprocessing/OneHotEncoderSuite.scala | 14 +-- 2 files changed, 73 insertions(+), 44 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/preprocessing/OneHotEncoder.scala b/mllib/src/main/scala/org/apache/spark/mllib/preprocessing/OneHotEncoder.scala index 1011bcb765866..b6c1ccc6ee261 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/preprocessing/OneHotEncoder.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/preprocessing/OneHotEncoder.scala @@ -17,20 +17,34 @@ package org.apache.spark.mllib.preprocessing -import org.apache.spark.rdd.RDD +import scala.collection.mutable +import scala.reflect.ClassTag -import scala.collection.mutable.HashSet -import scala.collection.mutable.Set +import org.apache.spark.rdd.RDD /** * A utility for encoding categorical variables as numeric variables. The resulting vectors * contain a component for each value that the variable can take. The component corresponding * to the value that the variable takes is set to 1 and the components corresponding to all other - * categories are set to 0. + * categories are set to 0 - [[http://en.wikipedia.org/wiki/One-hot]]. * * The utility handles input vectors with mixed categorical and numeric variables by accepting a * list of feature indices that are categorical and only transforming those. * + * The utility can transform vectors such as: + * {{{ + * (1.7, "apple", 2.0) + * (4.9, "banana", 5.6) + * (8.0, "pear", 6.0) + * }}} + * + * Into: + * {{{ + * (1.7, 1, 0, 0, 2.0) + * (4.9, 0, 1, 0, 5.6) + * (8.0, 0, 0, 1, 6.0) + * }}} + * * An example usage is: * * {{{ @@ -44,36 +58,40 @@ object OneHotEncoder { /** * Given a dataset and the set of columns which are categorical variables, returns a structure * that, for each field, describes the values that are present for in the dataset. The structure - * is meant to be used as input to encode. + * is meant to be used as input to the encode method. */ - def categories(rdd: RDD[Array[Any]], categoricalFields: Seq[Int]): Array[Map[Any, Int]] = { - val categories = rdd.map(categoricals(_, categoricalFields)).reduce(uniqueCats) + def categories[T](rdd: RDD[Array[T]], catFields: Seq[Int]): Array[(Int, Map[T, Int])] = { + val categoriesArr = new Array[mutable.Set[T]](catFields.length) + for (i <- 0 until catFields.length) { + categoriesArr(i) = new mutable.HashSet[T]() + } + + val categories = rdd.aggregate(categoriesArr)(mergeElement(catFields), mergeSets) - val catMaps = new Array[Map[Any, Int]](rdd.first().length) - for (i <- 0 until categoricalFields.length) { - catMaps(categoricalFields(i)) = categories(i).zipWithIndex.toMap + val catMaps = new Array[(Int, Map[T, Int])](catFields.length) + for (i <- 0 until catFields.length) { + catMaps(i) = (catFields(i), categories(i).zipWithIndex.toMap) } catMaps } - /** - * Accepts a vector and set of feature indices that are categorical variables. Outputs an array - * whose size is the number of categorical fields and each element is a Set of size one - * containing a categorical value from the input vec - */ - private def categoricals(tokens: Array[Any], catFields: Seq[Int]): Array[Set[Any]] = { - val categoriesArr = new Array[Set[Any]](catFields.length) - for (i <- 0 until catFields.length) { - categoriesArr(i) = new HashSet[Any]() - categoriesArr(i) += tokens(catFields(i)) + private def mergeElement[T](catFields: Seq[Int])(a: Array[mutable.Set[T]], b: Array[T]): + Array[mutable.Set[T]] = { + var i = 0 + while (i < catFields.length) { + a(i) += b(catFields(i)) + i += 1 } - categoriesArr + a } - private def uniqueCats(a: Array[Set[Any]], b: Array[Set[Any]]): Array[Set[Any]] = { - for (i <- 0 until a.length) { + private def mergeSets[T](a: Array[mutable.Set[T]], b: Array[mutable.Set[T]]): + Array[mutable.Set[T]] = { + var i = 0 + while (i < a.length) { a(i) ++= b(i) + i += 1 } a } @@ -81,30 +99,41 @@ object OneHotEncoder { /** * OneHot encodes the given RDD. */ - def encode(rdd: RDD[Array[Any]], featureCategories: Array[Map[Any, Int]]): - RDD[Array[Any]] = { - var outArrLen = 0 + def encode[T:ClassTag](rdd: RDD[Array[T]], featureCategories: Array[(Int, Map[T, Int])]): + RDD[Array[T]] = { + var outArrLen = rdd.first().length for (catMap <- featureCategories) { - outArrLen += (if (catMap == null) 1 else catMap.size) + outArrLen += (catMap._2.size - 1) } - rdd.map(encodeVec(_, featureCategories, outArrLen)) + rdd.map(encodeVec[T](_, featureCategories, outArrLen)) } - private def encodeVec(vec: Array[Any], featureCategories: Array[Map[Any, Int]], - outArrLen: Int): Array[Any] = { + private def encodeVec[T:ClassTag](vec: Array[T], featureCategories: Array[(Int, Map[T, Int])], + outArrLen: Int): Array[T] = { var outArrIndex = 0 - val outVec = new Array[Any](outArrLen) - for (i <- 0 until vec.length) { - if (featureCategories(i) != null) { - for (j <- outArrIndex until outArrIndex + featureCategories(i).size) { - outVec(j) = 0 + val outVec = new Array[T](outArrLen) + var i = 0 + var featureCatIndex = 0 + val zero = 0.asInstanceOf[T] + val one = 1.asInstanceOf[T] + while (i < vec.length) { + if (featureCatIndex < featureCategories.length && + featureCategories(featureCatIndex)._1 == i) { + var j = outArrIndex + val catVals = featureCategories(featureCatIndex)._2 + while (j < outArrIndex + catVals.size) { + outVec(j) = zero + j += 1 } - outVec(outArrIndex + featureCategories(i).getOrElse(vec(i), -1)) = 1 - outArrIndex += featureCategories(i).size + outVec(outArrIndex + catVals.getOrElse(vec(i), -1)) = one + outArrIndex += catVals.size + featureCatIndex += 1 } else { outVec(outArrIndex) = vec(i) outArrIndex += 1 } + + i += 1 } outVec } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala index a121f4deee16b..00b73dbea2a3b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/preprocessing/OneHotEncoderSuite.scala @@ -32,7 +32,7 @@ class OneHotEncoderSuite extends FunSuite with LocalSparkContext with ShouldMatc Array("nirvana", 6.7, "apple", 3) ) val categoricalFields = Array(0, 2) - val rdd = sc.parallelize(vecs, 1) + val rdd = sc.parallelize(vecs, 2) val catMap = OneHotEncoder.categories(rdd, categoricalFields) val encoded = OneHotEncoder.encode(rdd, catMap) @@ -41,16 +41,16 @@ class OneHotEncoderSuite extends FunSuite with LocalSparkContext with ShouldMatc result.size should be (vecs.size) val vec1 = Array[Any](0, 0, 0, 1.3, 0, 0, 2) - vec1(catMap(0).getOrElse("marcy playground", -1)) = 1 - vec1(4 + catMap(2).getOrElse("apple", -1)) = 1 + vec1(catMap(0)._2.getOrElse("marcy playground", -1)) = 1 + vec1(4 + catMap(1)._2.getOrElse("apple", -1)) = 1 val vec2 = Array[Any](0, 0, 0, 3.5, 0, 0, 4) - vec2(catMap(0).getOrElse("pearl jam", -1)) = 1 - vec2(4 + catMap(2).getOrElse("banana", -1)) = 1 + vec2(catMap(0)._2.getOrElse("pearl jam", -1)) = 1 + vec2(4 + catMap(1)._2.getOrElse("banana", -1)) = 1 val vec3 = Array[Any](0, 0, 0, 6.7, 0, 0, 3) - vec3(catMap(0).getOrElse("nirvana", -1)) = 1 - vec3(4 + catMap(2).getOrElse("apple", -1)) = 1 + vec3(catMap(0)._2.getOrElse("nirvana", -1)) = 1 + vec3(4 + catMap(1)._2.getOrElse("apple", -1)) = 1 result(0) should equal (vec1) result(1) should equal (vec2)