From 5fe190e481ba35f5e14575ba26ce8ff3ff29588e Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 7 May 2015 16:09:25 +0800 Subject: [PATCH 01/12] add bucketizer --- .../apache/spark/ml/feature/Bucketizer.scala | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala new file mode 100644 index 0000000000000..d9880cf11d9a0 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.attribute.{NominalAttribute, BinaryAttribute} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} +import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{DoubleType, StructType} + +/** + * :: AlphaComponent :: + * Binarize a column of continuous features given a threshold. + */ +@AlphaComponent +final class Bucketizer extends Transformer with HasInputCol with HasOutputCol { + + /** + * Param for threshold used to binarize continuous features. + * The features greater than the threshold, will be binarized to 1.0. + * The features equal to or less than the threshold, will be binarized to 0.0. + * @group param + */ + val buckets: Param[Array[Double]] = new Param[Array[Double]](this, "buckets", "") + + /** @group getParam */ + def getBuckets: Array[Double] = $(buckets) + + /** @group setParam */ + def setBuckets(value: Array[Double]): this.type = set(buckets, value) + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def transform(dataset: DataFrame): DataFrame = { + transformSchema(dataset.schema) + val bucketizer = udf { feature: Double => binarySearchForBins($(buckets), feature) } + val outputColName = $(outputCol) + val metadata = NominalAttribute.defaultAttr + .withName(outputColName).withValues($(buckets).map(_.toString)).toMetadata() + dataset.select(col("*"), bucketizer(dataset($(inputCol))).as(outputColName, metadata)) + } + + /** + * Binary searching in several bins to place each data point. + */ + private def binarySearchForBins(splits: Array[Double], feature: Double): Double = { + val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) + var left = 0 + var right = wrappedSplits.length - 2 + while (left <= right) { + val mid = left + (right - left) / 2 + val split = wrappedSplits(mid) + if ((feature > split) && (feature <= wrappedSplits(mid + 1))) { + return mid + } else if (feature <= split) { + right = mid - 1 + } else { + left = mid + 1 + } + } + -1 + } + + override def transformSchema(schema: StructType): StructType = { + SchemaUtils.checkColumnType(schema, $(inputCol), DoubleType) + + val inputFields = schema.fields + val outputColName = $(outputCol) + + require(inputFields.forall(_.name != outputColName), + s"Output column $outputColName already exists.") + + val attr = NominalAttribute.defaultAttr.withName(outputColName) + val outputFields = inputFields :+ attr.toStructField() + StructType(outputFields) + } +} From 4024cf1a74ba70f92d21e92db1b7449e72c88357 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 7 May 2015 17:30:20 +0800 Subject: [PATCH 02/12] add test suite --- .../apache/spark/ml/feature/Bucketizer.scala | 13 +++--- .../spark/ml/feature/BucketizerSuite.scala | 44 +++++++++++++++++++ 2 files changed, 50 insertions(+), 7 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index d9880cf11d9a0..3a3f868d4ed74 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.feature import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.Transformer -import org.apache.spark.ml.attribute.{NominalAttribute, BinaryAttribute} +import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.util.SchemaUtils @@ -29,18 +29,17 @@ import org.apache.spark.sql.types.{DoubleType, StructType} /** * :: AlphaComponent :: - * Binarize a column of continuous features given a threshold. + * `Bucketizer` maps a column of continuous features to a column of feature buckets. */ @AlphaComponent final class Bucketizer extends Transformer with HasInputCol with HasOutputCol { /** - * Param for threshold used to binarize continuous features. - * The features greater than the threshold, will be binarized to 1.0. - * The features equal to or less than the threshold, will be binarized to 0.0. + * Parameter for mapping continuous features into buckets. * @group param */ - val buckets: Param[Array[Double]] = new Param[Array[Double]](this, "buckets", "") + val buckets: Param[Array[Double]] = new Param[Array[Double]](this, "buckets", + "Map continuous features into buckets.") /** @group getParam */ def getBuckets: Array[Double] = $(buckets) @@ -64,7 +63,7 @@ final class Bucketizer extends Transformer with HasInputCol with HasOutputCol { } /** - * Binary searching in several bins to place each data point. + * Binary searching in several buckets to place each data point. */ private def binarySearchForBins(splits: Array[Double], feature: Double): Double = { val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala new file mode 100644 index 0000000000000..68c4ffbf5a73b --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.scalatest.FunSuite + +class BucketizerSuite extends FunSuite with MLlibTestSparkContext { + + test("Bucket continuous features with setter") { + val sqlContext = new SQLContext(sc) + val data = Array(0.1, -0.5, 0.2, -0.3, 0.8, 0.7, -0.1, -0.4) + val buckets = Array(-0.5, 0.0, 0.5) + val bucketizedData = Array(2.0, 0.0, 2.0, 1.0, 3.0, 3.0, 1.0, 1.0) + val dataFrame: DataFrame = sqlContext.createDataFrame( + data.zip(bucketizedData)).toDF("feature", "expected") + + val bucketizer: Bucketizer = new Bucketizer() + .setInputCol("feature") + .setOutputCol("result") + .setBuckets(buckets) + + bucketizer.transform(dataFrame).select("result", "expected").collect().foreach { + case Row(x: Double, y: Double) => + assert(x === y, "The feature value is not correct after bucketing.") + } + } +} From 998bc87e43c26a1d4890eae8ceb13f057171d58c Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 7 May 2015 21:01:04 +0800 Subject: [PATCH 03/12] check buckets --- .../apache/spark/ml/feature/Bucketizer.scala | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index 3a3f868d4ed74..b3d8b17cecdfd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -34,12 +34,30 @@ import org.apache.spark.sql.types.{DoubleType, StructType} @AlphaComponent final class Bucketizer extends Transformer with HasInputCol with HasOutputCol { + /** + * The given buckets should match 1) its size is larger than zero; 2) it is ordered in a non-DESC + * way. + */ + private def checkBuckets(buckets: Array[Double]): Boolean = { + if (buckets.size == 0) false + else if (buckets.size == 1) true + else { + buckets.foldLeft((true, Double.MinValue)) { case ((validator, prevValue), currValue) => + if (validator & prevValue <= currValue) { + (true, currValue) + } else { + (false, currValue) + } + }._1 + } + } + /** * Parameter for mapping continuous features into buckets. * @group param */ val buckets: Param[Array[Double]] = new Param[Array[Double]](this, "buckets", - "Map continuous features into buckets.") + "Split points for mapping continuous features into buckets.", checkBuckets) /** @group getParam */ def getBuckets: Array[Double] = $(buckets) @@ -55,7 +73,7 @@ final class Bucketizer extends Transformer with HasInputCol with HasOutputCol { override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema) - val bucketizer = udf { feature: Double => binarySearchForBins($(buckets), feature) } + val bucketizer = udf { feature: Double => binarySearchForBuckets($(buckets), feature) } val outputColName = $(outputCol) val metadata = NominalAttribute.defaultAttr .withName(outputColName).withValues($(buckets).map(_.toString)).toMetadata() @@ -65,7 +83,7 @@ final class Bucketizer extends Transformer with HasInputCol with HasOutputCol { /** * Binary searching in several buckets to place each data point. */ - private def binarySearchForBins(splits: Array[Double], feature: Double): Double = { + private def binarySearchForBuckets(splits: Array[Double], feature: Double): Double = { val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) var left = 0 var right = wrappedSplits.length - 2 From 11fb00a71e713fe10df633909f361fc44f73ddc8 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 7 May 2015 22:38:39 +0800 Subject: [PATCH 04/12] change it into an Estimator --- .../main/scala/org/apache/spark/ml/feature/Bucketizer.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index b3d8b17cecdfd..ef41ac996195f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -18,11 +18,11 @@ package org.apache.spark.ml.feature import org.apache.spark.annotation.AlphaComponent -import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, StructType} @@ -32,7 +32,8 @@ import org.apache.spark.sql.types.{DoubleType, StructType} * `Bucketizer` maps a column of continuous features to a column of feature buckets. */ @AlphaComponent -final class Bucketizer extends Transformer with HasInputCol with HasOutputCol { +final class Bucketizer(override val parent: Estimator[Bucketizer] = null) + extends Model[Bucketizer] with HasInputCol with HasOutputCol { /** * The given buckets should match 1) its size is larger than zero; 2) it is ordered in a non-DESC From 246632222054c0f349b9c4aff84300882915d36e Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 8 May 2015 06:26:00 +0800 Subject: [PATCH 05/12] refactor Bucketizer --- .../apache/spark/ml/feature/Bucketizer.scala | 130 +++++++++++------- .../spark/ml/feature/BucketizerSuite.scala | 9 +- 2 files changed, 88 insertions(+), 51 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index ef41ac996195f..018a492f75b7f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -25,46 +25,55 @@ import org.apache.spark.ml.util.SchemaUtils import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.sql._ import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{DoubleType, StructType} +import org.apache.spark.sql.types.{DoubleType, StructField, StructType} /** * :: AlphaComponent :: * `Bucketizer` maps a column of continuous features to a column of feature buckets. */ @AlphaComponent -final class Bucketizer(override val parent: Estimator[Bucketizer] = null) +private[ml] final class Bucketizer(override val parent: Estimator[Bucketizer]) extends Model[Bucketizer] with HasInputCol with HasOutputCol { - /** - * The given buckets should match 1) its size is larger than zero; 2) it is ordered in a non-DESC - * way. - */ - private def checkBuckets(buckets: Array[Double]): Boolean = { - if (buckets.size == 0) false - else if (buckets.size == 1) true - else { - buckets.foldLeft((true, Double.MinValue)) { case ((validator, prevValue), currValue) => - if (validator & prevValue <= currValue) { - (true, currValue) - } else { - (false, currValue) - } - }._1 - } - } + def this() = this(null) /** - * Parameter for mapping continuous features into buckets. + * Parameter for mapping continuous features into buckets. With n splits, there are n+1 buckets. + * A bucket defined by splits x,y holds values in the range (x,y]. * @group param */ - val buckets: Param[Array[Double]] = new Param[Array[Double]](this, "buckets", - "Split points for mapping continuous features into buckets.", checkBuckets) + val splits: Param[Array[Double]] = new Param[Array[Double]](this, "splits", + "Split points for mapping continuous features into buckets. With n splits, there are n+1" + + " buckets. A bucket defined by splits x,y holds values in the range (x,y].", + Bucketizer.checkSplits) /** @group getParam */ - def getBuckets: Array[Double] = $(buckets) + def getSplits: Array[Double] = $(splits) /** @group setParam */ - def setBuckets(value: Array[Double]): this.type = set(buckets, value) + def setSplits(value: Array[Double]): this.type = set(splits, value) + + /** @group Param */ + val lowerInclusive: BooleanParam = new BooleanParam(this, "lowerInclusive", + "An indicator of the inclusiveness of negative infinite.") + setDefault(lowerInclusive -> true) + + /** @group getParam */ + def getLowerInclusive: Boolean = $(lowerInclusive) + + /** @group setParam */ + def setLowerInclusive(value: Boolean): this.type = set(lowerInclusive, value) + + /** @group Param */ + val upperInclusive: BooleanParam = new BooleanParam(this, "upperInclusive", + "An indicator of the inclusiveness of positive infinite.") + setDefault(upperInclusive -> true) + + /** @group getParam */ + def getUpperInclusive: Boolean = $(upperInclusive) + + /** @group setParam */ + def setUpperInclusive(value: Boolean): this.type = set(upperInclusive, value) /** @group setParam */ def setInputCol(value: String): this.type = set(inputCol, value) @@ -74,24 +83,61 @@ final class Bucketizer(override val parent: Estimator[Bucketizer] = null) override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema) - val bucketizer = udf { feature: Double => binarySearchForBuckets($(buckets), feature) } - val outputColName = $(outputCol) - val metadata = NominalAttribute.defaultAttr - .withName(outputColName).withValues($(buckets).map(_.toString)).toMetadata() - dataset.select(col("*"), bucketizer(dataset($(inputCol))).as(outputColName, metadata)) + val wrappedSplits = Array(Double.MinValue) ++ $(splits) ++ Array(Double.MaxValue) + val bucketizer = udf { feature: Double => + Bucketizer.binarySearchForBuckets(wrappedSplits, feature) } + val newCol = bucketizer(dataset($(inputCol))) + val newField = prepOutputField(dataset.schema) + dataset.withColumn($(outputCol), newCol.as($(outputCol), newField.metadata)) + } + + private def prepOutputField(schema: StructType): StructField = { + val attr = new NominalAttribute( + name = Some($(outputCol)), + isOrdinal = Some(true), + numValues = Some($(splits).size), + values = Some($(splits).map(_.toString))) + + attr.toStructField() + } + + override def transformSchema(schema: StructType): StructType = { + SchemaUtils.checkColumnType(schema, $(inputCol), DoubleType) + require(schema.fields.forall(_.name != $(outputCol)), + s"Output column ${$(outputCol)} already exists.") + StructType(schema.fields :+ prepOutputField(schema)) + } +} + +object Bucketizer { + /** + * The given splits should match 1) its size is larger than zero; 2) it is ordered in a strictly + * increasing way. + */ + private def checkSplits(splits: Array[Double]): Boolean = { + if (splits.size == 0) false + else if (splits.size == 1) true + else { + splits.foldLeft((true, Double.MinValue)) { case ((validator, prevValue), currValue) => + if (validator && prevValue < currValue) { + (true, currValue) + } else { + (false, currValue) + } + }._1 + } } /** * Binary searching in several buckets to place each data point. */ - private def binarySearchForBuckets(splits: Array[Double], feature: Double): Double = { - val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) + private[feature] def binarySearchForBuckets(splits: Array[Double], feature: Double): Double = { var left = 0 - var right = wrappedSplits.length - 2 + var right = splits.length - 2 while (left <= right) { val mid = left + (right - left) / 2 - val split = wrappedSplits(mid) - if ((feature > split) && (feature <= wrappedSplits(mid + 1))) { + val split = splits(mid) + if ((feature > split) && (feature <= splits(mid + 1))) { return mid } else if (feature <= split) { right = mid - 1 @@ -99,20 +145,6 @@ final class Bucketizer(override val parent: Estimator[Bucketizer] = null) left = mid + 1 } } - -1 - } - - override def transformSchema(schema: StructType): StructType = { - SchemaUtils.checkColumnType(schema, $(inputCol), DoubleType) - - val inputFields = schema.fields - val outputColName = $(outputCol) - - require(inputFields.forall(_.name != outputColName), - s"Output column $outputColName already exists.") - - val attr = NominalAttribute.defaultAttr.withName(outputColName) - val outputFields = inputFields :+ attr.toStructField() - StructType(outputFields) + throw new Exception("Failed to find a bucket.") } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index 68c4ffbf5a73b..a89d9bbedb9f3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -17,9 +17,10 @@ package org.apache.spark.ml.feature +import org.scalatest.FunSuite + import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.scalatest.FunSuite class BucketizerSuite extends FunSuite with MLlibTestSparkContext { @@ -34,11 +35,15 @@ class BucketizerSuite extends FunSuite with MLlibTestSparkContext { val bucketizer: Bucketizer = new Bucketizer() .setInputCol("feature") .setOutputCol("result") - .setBuckets(buckets) + .setSplits(buckets) bucketizer.transform(dataFrame).select("result", "expected").collect().foreach { case Row(x: Double, y: Double) => assert(x === y, "The feature value is not correct after bucketing.") } } + + test("Binary search for finding buckets") { + + } } From fb30d792f3b67b5915ea1ec985a35f0dbd1ea08c Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 8 May 2015 07:42:07 +0800 Subject: [PATCH 06/12] fix and test binary search --- .../apache/spark/ml/feature/Bucketizer.scala | 23 +++++++++++------ .../spark/ml/feature/BucketizerSuite.scala | 25 +++++++++++++++++-- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index 018a492f75b7f..1b5af2b31538a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -39,12 +39,12 @@ private[ml] final class Bucketizer(override val parent: Estimator[Bucketizer]) /** * Parameter for mapping continuous features into buckets. With n splits, there are n+1 buckets. - * A bucket defined by splits x,y holds values in the range (x,y]. + * A bucket defined by splits x,y holds values in the range [x,y). * @group param */ val splits: Param[Array[Double]] = new Param[Array[Double]](this, "splits", "Split points for mapping continuous features into buckets. With n splits, there are n+1" + - " buckets. A bucket defined by splits x,y holds values in the range (x,y].", + " buckets. A bucket defined by splits x,y holds values in the range [x,y).", Bucketizer.checkSplits) /** @group getParam */ @@ -85,7 +85,8 @@ private[ml] final class Bucketizer(override val parent: Estimator[Bucketizer]) transformSchema(dataset.schema) val wrappedSplits = Array(Double.MinValue) ++ $(splits) ++ Array(Double.MaxValue) val bucketizer = udf { feature: Double => - Bucketizer.binarySearchForBuckets(wrappedSplits, feature) } + Bucketizer + .binarySearchForBuckets(wrappedSplits, feature, $(lowerInclusive), $(upperInclusive)) } val newCol = bucketizer(dataset($(inputCol))) val newField = prepOutputField(dataset.schema) dataset.withColumn($(outputCol), newCol.as($(outputCol), newField.metadata)) @@ -95,7 +96,6 @@ private[ml] final class Bucketizer(override val parent: Estimator[Bucketizer]) val attr = new NominalAttribute( name = Some($(outputCol)), isOrdinal = Some(true), - numValues = Some($(splits).size), values = Some($(splits).map(_.toString))) attr.toStructField() @@ -131,20 +131,27 @@ object Bucketizer { /** * Binary searching in several buckets to place each data point. */ - private[feature] def binarySearchForBuckets(splits: Array[Double], feature: Double): Double = { + private[feature] def binarySearchForBuckets( + splits: Array[Double], + feature: Double, + lowerInclusive: Boolean, + upperInclusive: Boolean): Double = { + if ((feature < splits.head && !lowerInclusive) || (feature > splits.last && !upperInclusive)) + throw new Exception(s"Feature $feature out of bound, check your features or loose the" + + s" lower/upper bound constraint.") var left = 0 var right = splits.length - 2 while (left <= right) { val mid = left + (right - left) / 2 val split = splits(mid) - if ((feature > split) && (feature <= splits(mid + 1))) { + if ((feature >= split) && (feature < splits(mid + 1))) { return mid - } else if (feature <= split) { + } else if (feature < split) { right = mid - 1 } else { left = mid + 1 } } - throw new Exception("Failed to find a bucket.") + throw new Exception(s"Failed to find a bucket for feature $feature.") } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index a89d9bbedb9f3..d549d03dc6872 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -17,18 +17,22 @@ package org.apache.spark.ml.feature +import scala.util.Random + import org.scalatest.FunSuite +import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} class BucketizerSuite extends FunSuite with MLlibTestSparkContext { test("Bucket continuous features with setter") { val sqlContext = new SQLContext(sc) - val data = Array(0.1, -0.5, 0.2, -0.3, 0.8, 0.7, -0.1, -0.4) + val data = Array(0.1, -0.5, 0.2, -0.3, 0.8, 0.7, -0.1, -0.4, -0.9) val buckets = Array(-0.5, 0.0, 0.5) - val bucketizedData = Array(2.0, 0.0, 2.0, 1.0, 3.0, 3.0, 1.0, 1.0) + val bucketizedData = Array(2.0, 1.0, 2.0, 1.0, 3.0, 3.0, 1.0, 1.0, 0.0) val dataFrame: DataFrame = sqlContext.createDataFrame( data.zip(bucketizedData)).toDF("feature", "expected") @@ -44,6 +48,23 @@ class BucketizerSuite extends FunSuite with MLlibTestSparkContext { } test("Binary search for finding buckets") { + val data = Array.fill[Double](100)(Random.nextDouble()) + val splits = Array.fill[Double](10)(Random.nextDouble()).sorted + val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) + val bsResult = Vectors.dense( + data.map(x => Bucketizer.binarySearchForBuckets(wrappedSplits, x, true, true))) + val lsResult = Vectors.dense(data.map(x => BucketizerSuite.linearSearchForBuckets(splits, x))) + assert(bsResult ~== lsResult absTol 1e-5) + } +} +object BucketizerSuite { + private def linearSearchForBuckets(splits: Array[Double], feature: Double): Double = { + var i = 0 + while (i < splits.size) { + if (feature < splits(i)) return i + i += 1 + } + i } } From ac778591ad55845912dc6608d8de5e2935c95f73 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 8 May 2015 08:08:38 +0800 Subject: [PATCH 07/12] fix style error --- .../main/scala/org/apache/spark/ml/feature/Bucketizer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index 1b5af2b31538a..2b123371ae11e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -136,9 +136,10 @@ object Bucketizer { feature: Double, lowerInclusive: Boolean, upperInclusive: Boolean): Double = { - if ((feature < splits.head && !lowerInclusive) || (feature > splits.last && !upperInclusive)) + if ((feature < splits.head && !lowerInclusive) || (feature > splits.last && !upperInclusive)) { throw new Exception(s"Feature $feature out of bound, check your features or loose the" + s" lower/upper bound constraint.") + } var left = 0 var right = splits.length - 2 while (left <= right) { From 3a16cc2f1836dbd8fe22f6603c641ba3aedf5bea Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 8 May 2015 11:18:54 +0800 Subject: [PATCH 08/12] refine comments and names --- .../apache/spark/ml/feature/Bucketizer.scala | 42 ++++++++++++------- .../spark/ml/feature/BucketizerSuite.scala | 6 +-- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index 2b123371ae11e..eeace85d198dc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -32,19 +32,21 @@ import org.apache.spark.sql.types.{DoubleType, StructField, StructType} * `Bucketizer` maps a column of continuous features to a column of feature buckets. */ @AlphaComponent -private[ml] final class Bucketizer(override val parent: Estimator[Bucketizer]) +final class Bucketizer private[ml] (override val parent: Estimator[Bucketizer]) extends Model[Bucketizer] with HasInputCol with HasOutputCol { def this() = this(null) /** * Parameter for mapping continuous features into buckets. With n splits, there are n+1 buckets. - * A bucket defined by splits x,y holds values in the range [x,y). + * A bucket defined by splits x,y holds values in the range [x,y). Note that the splits should be + * strictly increasing. * @group param */ val splits: Param[Array[Double]] = new Param[Array[Double]](this, "splits", - "Split points for mapping continuous features into buckets. With n splits, there are n+1" + - " buckets. A bucket defined by splits x,y holds values in the range [x,y).", + "Split points for mapping continuous features into buckets. With n splits, there are n+1 " + + "buckets. A bucket defined by splits x,y holds values in the range [x,y). The splits " + + "should be strictly increasing.", Bucketizer.checkSplits) /** @group getParam */ @@ -53,9 +55,15 @@ private[ml] final class Bucketizer(override val parent: Estimator[Bucketizer]) /** @group setParam */ def setSplits(value: Array[Double]): this.type = set(splits, value) - /** @group Param */ + /** + * An indicator of the inclusiveness of negative infinite. If true, then use implicit bin + * (-inf, getSplits.head). If false, then throw exception if values < getSplits.head are + * encountered. + * @group Param */ val lowerInclusive: BooleanParam = new BooleanParam(this, "lowerInclusive", - "An indicator of the inclusiveness of negative infinite.") + "An indicator of the inclusiveness of negative infinite. If true, then use implicit bin " + + "(-inf, getSplits.head). If false, then throw exception if values < getSplits.head are " + + "encountered.") setDefault(lowerInclusive -> true) /** @group getParam */ @@ -64,9 +72,15 @@ private[ml] final class Bucketizer(override val parent: Estimator[Bucketizer]) /** @group setParam */ def setLowerInclusive(value: Boolean): this.type = set(lowerInclusive, value) - /** @group Param */ + /** + * An indicator of the inclusiveness of positive infinite. If true, then use implicit bin + * [getSplits.last, inf). If false, then throw exception if values > getSplits.last are + * encountered. + * @group Param */ val upperInclusive: BooleanParam = new BooleanParam(this, "upperInclusive", - "An indicator of the inclusiveness of positive infinite.") + "An indicator of the inclusiveness of positive infinite. If true, then use implicit bin " + + "[getSplits.last, inf). If false, then throw exception if values > getSplits.last are " + + "encountered.") setDefault(upperInclusive -> true) /** @group getParam */ @@ -93,9 +107,7 @@ private[ml] final class Bucketizer(override val parent: Estimator[Bucketizer]) } private def prepOutputField(schema: StructType): StructField = { - val attr = new NominalAttribute( - name = Some($(outputCol)), - isOrdinal = Some(true), + val attr = new NominalAttribute(name = Some($(outputCol)), isOrdinal = Some(true), values = Some($(splits).map(_.toString))) attr.toStructField() @@ -109,7 +121,7 @@ private[ml] final class Bucketizer(override val parent: Estimator[Bucketizer]) } } -object Bucketizer { +private[feature] object Bucketizer { /** * The given splits should match 1) its size is larger than zero; 2) it is ordered in a strictly * increasing way. @@ -137,8 +149,8 @@ object Bucketizer { lowerInclusive: Boolean, upperInclusive: Boolean): Double = { if ((feature < splits.head && !lowerInclusive) || (feature > splits.last && !upperInclusive)) { - throw new Exception(s"Feature $feature out of bound, check your features or loose the" + - s" lower/upper bound constraint.") + throw new RuntimeException(s"Feature $feature out of bound, check your features or loosen " + + s"the lower/upper bound constraint.") } var left = 0 var right = splits.length - 2 @@ -153,6 +165,6 @@ object Bucketizer { left = mid + 1 } } - throw new Exception(s"Failed to find a bucket for feature $feature.") + throw new RuntimeException(s"Unexpected error: failed to find a bucket for feature $feature.") } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index d549d03dc6872..d34ad525e9724 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -31,7 +31,7 @@ class BucketizerSuite extends FunSuite with MLlibTestSparkContext { test("Bucket continuous features with setter") { val sqlContext = new SQLContext(sc) val data = Array(0.1, -0.5, 0.2, -0.3, 0.8, 0.7, -0.1, -0.4, -0.9) - val buckets = Array(-0.5, 0.0, 0.5) + val splits = Array(-0.5, 0.0, 0.5) val bucketizedData = Array(2.0, 1.0, 2.0, 1.0, 3.0, 3.0, 1.0, 1.0, 0.0) val dataFrame: DataFrame = sqlContext.createDataFrame( data.zip(bucketizedData)).toDF("feature", "expected") @@ -39,7 +39,7 @@ class BucketizerSuite extends FunSuite with MLlibTestSparkContext { val bucketizer: Bucketizer = new Bucketizer() .setInputCol("feature") .setOutputCol("result") - .setSplits(buckets) + .setSplits(splits) bucketizer.transform(dataFrame).select("result", "expected").collect().foreach { case Row(x: Double, y: Double) => @@ -58,7 +58,7 @@ class BucketizerSuite extends FunSuite with MLlibTestSparkContext { } } -object BucketizerSuite { +private object BucketizerSuite { private def linearSearchForBuckets(splits: Array[Double], feature: Double): Double = { var i = 0 while (i < splits.size) { From c3cc770331b733a9a103d95f215ea559a8463f19 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 8 May 2015 11:45:22 +0800 Subject: [PATCH 09/12] add more unit test for binary search --- .../spark/ml/feature/BucketizerSuite.scala | 36 +++++++++++++++++-- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index d34ad525e9724..8be5421bfff64 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -47,15 +47,45 @@ class BucketizerSuite extends FunSuite with MLlibTestSparkContext { } } - test("Binary search for finding buckets") { - val data = Array.fill[Double](100)(Random.nextDouble()) - val splits = Array.fill[Double](10)(Random.nextDouble()).sorted + test("Binary search correctness in contrast with linear search") { + val data = Array.fill(100)(Random.nextDouble()) + val splits = Array.fill(10)(Random.nextDouble()).sorted val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) val bsResult = Vectors.dense( data.map(x => Bucketizer.binarySearchForBuckets(wrappedSplits, x, true, true))) val lsResult = Vectors.dense(data.map(x => BucketizerSuite.linearSearchForBuckets(splits, x))) assert(bsResult ~== lsResult absTol 1e-5) } + + test("Binary search of features at splits") { + val splits = Array.fill(10)(Random.nextDouble()).sorted + val data = splits + val expected = Vectors.dense(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0) + val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) + val result = Vectors.dense( + data.map(x => Bucketizer.binarySearchForBuckets(wrappedSplits, x, true, true))) + assert(result ~== expected absTol 1e-5) + } + + test("Binary search of features between splits") { + val data = Array.fill(10)(Random.nextDouble()) + val splits = Array(-0.1, 1.1) + val expected = Vectors.dense(Array.fill(10)(1.0)) + val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) + val result = Vectors.dense( + data.map(x => Bucketizer.binarySearchForBuckets(wrappedSplits, x, true, true))) + assert(result ~== expected absTol 1e-5) + } + + test("Binary search of features outside splits") { + val data = Array.fill(5)(Random.nextDouble() + 1.1) ++ Array.fill(5)(Random.nextDouble() - 1.1) + val splits = Array(0.0, 1.1) + val expected = Vectors.dense(Array.fill(5)(2.0) ++ Array.fill(5)(0.0)) + val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) + val result = Vectors.dense( + data.map(x => Bucketizer.binarySearchForBuckets(wrappedSplits, x, true, true))) + assert(result ~== expected absTol 1e-5) + } } private object BucketizerSuite { From eacfcfae55f4e1231ae70691dd4d557af73aa4f7 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 8 May 2015 12:02:13 +0800 Subject: [PATCH 10/12] change ML attribute from splits into buckets --- .../org/apache/spark/ml/feature/Bucketizer.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index eeace85d198dc..1a476bacf1043 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -107,9 +107,16 @@ final class Bucketizer private[ml] (override val parent: Estimator[Bucketizer]) } private def prepOutputField(schema: StructType): StructField = { - val attr = new NominalAttribute(name = Some($(outputCol)), isOrdinal = Some(true), - values = Some($(splits).map(_.toString))) - + val innerRanges = $(splits).sliding(2).map(bucket => bucket.mkString(", ")).toArray + val values = ($(lowerInclusive), $(upperInclusive)) match { + case (true, true) => + Array(s"-inf, ${$(splits).head}") ++ innerRanges ++ Array(s"${$(splits).last}, inf") + case (true, false) => Array(s"-inf, ${$(splits).head}") ++ innerRanges + case (false, true) => innerRanges ++ Array(s"${$(splits).last}, inf") + case _ => innerRanges + } + val attr = + new NominalAttribute(name = Some($(outputCol)), isOrdinal = Some(true), values = Some(values)) attr.toStructField() } From 34f124a1b7069d643c3496168b867f2fdde87257 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 11 May 2015 12:10:38 -0700 Subject: [PATCH 11/12] Removed lowerInclusive, upperInclusive params from Bucketizer, and used splits instead. --- .../apache/spark/ml/feature/Bucketizer.scala | 120 +++++----------- .../apache/spark/ml/util/SchemaUtils.scala | 11 ++ .../spark/ml/feature/BucketizerSuite.scala | 136 ++++++++++++------ 3 files changed, 139 insertions(+), 128 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index 1a476bacf1043..7dba64bc3506f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -39,14 +39,17 @@ final class Bucketizer private[ml] (override val parent: Estimator[Bucketizer]) /** * Parameter for mapping continuous features into buckets. With n splits, there are n+1 buckets. - * A bucket defined by splits x,y holds values in the range [x,y). Note that the splits should be - * strictly increasing. + * A bucket defined by splits x,y holds values in the range [x,y). Splits should be strictly + * increasing. Values at -inf, inf must be explicitly provided to cover all Double values; + * otherwise, values outside the splits specified will be treated as errors. * @group param */ val splits: Param[Array[Double]] = new Param[Array[Double]](this, "splits", "Split points for mapping continuous features into buckets. With n splits, there are n+1 " + "buckets. A bucket defined by splits x,y holds values in the range [x,y). The splits " + - "should be strictly increasing.", + "should be strictly increasing. Values at -inf, inf must be explicitly provided to cover" + + " all Double values; otherwise, values outside the splits specified will be treated as" + + " errors.", Bucketizer.checkSplits) /** @group getParam */ @@ -55,40 +58,6 @@ final class Bucketizer private[ml] (override val parent: Estimator[Bucketizer]) /** @group setParam */ def setSplits(value: Array[Double]): this.type = set(splits, value) - /** - * An indicator of the inclusiveness of negative infinite. If true, then use implicit bin - * (-inf, getSplits.head). If false, then throw exception if values < getSplits.head are - * encountered. - * @group Param */ - val lowerInclusive: BooleanParam = new BooleanParam(this, "lowerInclusive", - "An indicator of the inclusiveness of negative infinite. If true, then use implicit bin " + - "(-inf, getSplits.head). If false, then throw exception if values < getSplits.head are " + - "encountered.") - setDefault(lowerInclusive -> true) - - /** @group getParam */ - def getLowerInclusive: Boolean = $(lowerInclusive) - - /** @group setParam */ - def setLowerInclusive(value: Boolean): this.type = set(lowerInclusive, value) - - /** - * An indicator of the inclusiveness of positive infinite. If true, then use implicit bin - * [getSplits.last, inf). If false, then throw exception if values > getSplits.last are - * encountered. - * @group Param */ - val upperInclusive: BooleanParam = new BooleanParam(this, "upperInclusive", - "An indicator of the inclusiveness of positive infinite. If true, then use implicit bin " + - "[getSplits.last, inf). If false, then throw exception if values > getSplits.last are " + - "encountered.") - setDefault(upperInclusive -> true) - - /** @group getParam */ - def getUpperInclusive: Boolean = $(upperInclusive) - - /** @group setParam */ - def setUpperInclusive(value: Boolean): this.type = set(upperInclusive, value) - /** @group setParam */ def setInputCol(value: String): this.type = set(inputCol, value) @@ -97,81 +66,66 @@ final class Bucketizer private[ml] (override val parent: Estimator[Bucketizer]) override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema) - val wrappedSplits = Array(Double.MinValue) ++ $(splits) ++ Array(Double.MaxValue) val bucketizer = udf { feature: Double => - Bucketizer - .binarySearchForBuckets(wrappedSplits, feature, $(lowerInclusive), $(upperInclusive)) } + Bucketizer.binarySearchForBuckets($(splits), feature) + } val newCol = bucketizer(dataset($(inputCol))) val newField = prepOutputField(dataset.schema) dataset.withColumn($(outputCol), newCol.as($(outputCol), newField.metadata)) } private def prepOutputField(schema: StructType): StructField = { - val innerRanges = $(splits).sliding(2).map(bucket => bucket.mkString(", ")).toArray - val values = ($(lowerInclusive), $(upperInclusive)) match { - case (true, true) => - Array(s"-inf, ${$(splits).head}") ++ innerRanges ++ Array(s"${$(splits).last}, inf") - case (true, false) => Array(s"-inf, ${$(splits).head}") ++ innerRanges - case (false, true) => innerRanges ++ Array(s"${$(splits).last}, inf") - case _ => innerRanges - } - val attr = - new NominalAttribute(name = Some($(outputCol)), isOrdinal = Some(true), values = Some(values)) + val buckets = $(splits).sliding(2).map(bucket => bucket.mkString(", ")).toArray + val attr = new NominalAttribute(name = Some($(outputCol)), isOrdinal = Some(true), + values = Some(buckets)) attr.toStructField() } override def transformSchema(schema: StructType): StructType = { SchemaUtils.checkColumnType(schema, $(inputCol), DoubleType) - require(schema.fields.forall(_.name != $(outputCol)), - s"Output column ${$(outputCol)} already exists.") - StructType(schema.fields :+ prepOutputField(schema)) + SchemaUtils.appendColumn(schema, prepOutputField(schema)) } } private[feature] object Bucketizer { - /** - * The given splits should match 1) its size is larger than zero; 2) it is ordered in a strictly - * increasing way. - */ - private def checkSplits(splits: Array[Double]): Boolean = { - if (splits.size == 0) false - else if (splits.size == 1) true - else { - splits.foldLeft((true, Double.MinValue)) { case ((validator, prevValue), currValue) => - if (validator && prevValue < currValue) { - (true, currValue) - } else { - (false, currValue) - } - }._1 + /** We require splits to be of length >= 3 and to be in strictly increasing order. */ + def checkSplits(splits: Array[Double]): Boolean = { + if (splits.length < 3) { + false + } else { + var i = 0 + while (i < splits.length - 1) { + if (splits(i) >= splits(i + 1)) return false + i += 1 + } + true } } /** * Binary searching in several buckets to place each data point. + * @throws RuntimeException if a feature is < splits.head or >= splits.last */ - private[feature] def binarySearchForBuckets( + def binarySearchForBuckets( splits: Array[Double], - feature: Double, - lowerInclusive: Boolean, - upperInclusive: Boolean): Double = { - if ((feature < splits.head && !lowerInclusive) || (feature > splits.last && !upperInclusive)) { - throw new RuntimeException(s"Feature $feature out of bound, check your features or loosen " + - s"the lower/upper bound constraint.") + feature: Double): Double = { + // Check bounds. We make an exception for +inf so that it can exist in some bin. + if ((feature < splits.head) || (feature >= splits.last && feature != Double.PositiveInfinity)) { + throw new RuntimeException(s"Feature value $feature out of Bucketizer bounds" + + s" [${splits.head}, ${splits.last}). Check your features, or loosen " + + s"the lower/upper bound constraints.") } var left = 0 var right = splits.length - 2 - while (left <= right) { - val mid = left + (right - left) / 2 - val split = splits(mid) - if ((feature >= split) && (feature < splits(mid + 1))) { - return mid - } else if (feature < split) { - right = mid - 1 + while (left < right) { + val mid = (left + right) / 2 + val split = splits(mid + 1) + if (feature < split) { + right = mid } else { left = mid + 1 } } - throw new RuntimeException(s"Unexpected error: failed to find a bucket for feature $feature.") + left } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala index 0383bf0b382b7..11592b77eb356 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala @@ -58,4 +58,15 @@ object SchemaUtils { val outputFields = schema.fields :+ StructField(colName, dataType, nullable = false) StructType(outputFields) } + + /** + * Appends a new column to the input schema. This fails if the given output column already exists. + * @param schema input schema + * @param col New column schema + * @return new schema with the input column appended + */ + def appendColumn(schema: StructType, col: StructField): StructType = { + require(!schema.fieldNames.contains(col.name), s"Column ${col.name} already exists.") + StructType(schema.fields :+ col) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index 8be5421bfff64..77b0b75391201 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -21,6 +21,7 @@ import scala.util.Random import org.scalatest.FunSuite +import org.apache.spark.SparkException import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -28,13 +29,20 @@ import org.apache.spark.sql.{DataFrame, Row, SQLContext} class BucketizerSuite extends FunSuite with MLlibTestSparkContext { - test("Bucket continuous features with setter") { - val sqlContext = new SQLContext(sc) - val data = Array(0.1, -0.5, 0.2, -0.3, 0.8, 0.7, -0.1, -0.4, -0.9) + @transient private var sqlContext: SQLContext = _ + + override def beforeAll(): Unit = { + super.beforeAll() + sqlContext = new SQLContext(sc) + } + + test("Bucket continuous features, without -inf,inf") { + // Check a set of valid feature values. val splits = Array(-0.5, 0.0, 0.5) - val bucketizedData = Array(2.0, 1.0, 2.0, 1.0, 3.0, 3.0, 1.0, 1.0, 0.0) - val dataFrame: DataFrame = sqlContext.createDataFrame( - data.zip(bucketizedData)).toDF("feature", "expected") + val validData = Array(-0.5, -0.3, 0.0, 0.2) + val expectedBuckets = Array(0.0, 0.0, 1.0, 1.0) + val dataFrame: DataFrame = + sqlContext.createDataFrame(validData.zip(expectedBuckets)).toDF("feature", "expected") val bucketizer: Bucketizer = new Bucketizer() .setInputCol("feature") @@ -43,58 +51,96 @@ class BucketizerSuite extends FunSuite with MLlibTestSparkContext { bucketizer.transform(dataFrame).select("result", "expected").collect().foreach { case Row(x: Double, y: Double) => - assert(x === y, "The feature value is not correct after bucketing.") + assert(x === y, + s"The feature value is not correct after bucketing. Expected $y but found $x") } - } - test("Binary search correctness in contrast with linear search") { - val data = Array.fill(100)(Random.nextDouble()) - val splits = Array.fill(10)(Random.nextDouble()).sorted - val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) - val bsResult = Vectors.dense( - data.map(x => Bucketizer.binarySearchForBuckets(wrappedSplits, x, true, true))) - val lsResult = Vectors.dense(data.map(x => BucketizerSuite.linearSearchForBuckets(splits, x))) - assert(bsResult ~== lsResult absTol 1e-5) + // Check for exceptions when using a set of invalid feature values. + val invalidData1: Array[Double] = Array(-0.9) ++ validData + val invalidData2 = Array(0.5) ++ validData + val badDF1 = sqlContext.createDataFrame(invalidData1.zipWithIndex).toDF("feature", "idx") + intercept[RuntimeException]{ + bucketizer.transform(badDF1).collect() + println("Invalid feature value -0.9 was not caught as an invalid feature!") + } + val badDF2 = sqlContext.createDataFrame(invalidData2.zipWithIndex).toDF("feature", "idx") + intercept[RuntimeException]{ + bucketizer.transform(badDF2).collect() + println("Invalid feature value 0.5 was not caught as an invalid feature!") + } } - test("Binary search of features at splits") { - val splits = Array.fill(10)(Random.nextDouble()).sorted - val data = splits - val expected = Vectors.dense(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0) - val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) - val result = Vectors.dense( - data.map(x => Bucketizer.binarySearchForBuckets(wrappedSplits, x, true, true))) - assert(result ~== expected absTol 1e-5) + test("Bucket continuous features, with -inf,inf") { + val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity) + val validData = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9) + val expectedBuckets = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0) + val dataFrame: DataFrame = + sqlContext.createDataFrame(validData.zip(expectedBuckets)).toDF("feature", "expected") + + val bucketizer: Bucketizer = new Bucketizer() + .setInputCol("feature") + .setOutputCol("result") + .setSplits(splits) + + bucketizer.transform(dataFrame).select("result", "expected").collect().foreach { + case Row(x: Double, y: Double) => + assert(x === y, + s"The feature value is not correct after bucketing. Expected $y but found $x") + } } - test("Binary search of features between splits") { - val data = Array.fill(10)(Random.nextDouble()) - val splits = Array(-0.1, 1.1) - val expected = Vectors.dense(Array.fill(10)(1.0)) - val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) - val result = Vectors.dense( - data.map(x => Bucketizer.binarySearchForBuckets(wrappedSplits, x, true, true))) - assert(result ~== expected absTol 1e-5) + test("Binary search correctness on hand-picked examples") { + import BucketizerSuite.checkBinarySearch + // length 3, with -inf + checkBinarySearch(Array(Double.NegativeInfinity, 0.0, 1.0)) + // length 4 + checkBinarySearch(Array(-1.0, -0.5, 0.0, 1.0)) + // length 5 + checkBinarySearch(Array(-1.0, -0.5, 0.0, 1.0, 1.5)) + // length 3, with inf + checkBinarySearch(Array(0.0, 1.0, Double.PositiveInfinity)) + // length 3, with -inf and inf + checkBinarySearch(Array(Double.NegativeInfinity, 1.0, Double.PositiveInfinity)) } - test("Binary search of features outside splits") { - val data = Array.fill(5)(Random.nextDouble() + 1.1) ++ Array.fill(5)(Random.nextDouble() - 1.1) - val splits = Array(0.0, 1.1) - val expected = Vectors.dense(Array.fill(5)(2.0) ++ Array.fill(5)(0.0)) - val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) - val result = Vectors.dense( - data.map(x => Bucketizer.binarySearchForBuckets(wrappedSplits, x, true, true))) - assert(result ~== expected absTol 1e-5) + test("Binary search correctness in contrast with linear search, on random data") { + val data = Array.fill(100)(Random.nextDouble()) + val splits: Array[Double] = Double.NegativeInfinity +: + Array.fill(10)(Random.nextDouble()).sorted :+ Double.PositiveInfinity + val bsResult = Vectors.dense(data.map(x => Bucketizer.binarySearchForBuckets(splits, x))) + val lsResult = Vectors.dense(data.map(x => BucketizerSuite.linearSearchForBuckets(splits, x))) + assert(bsResult ~== lsResult absTol 1e-5) } } -private object BucketizerSuite { - private def linearSearchForBuckets(splits: Array[Double], feature: Double): Double = { +private object BucketizerSuite extends FunSuite { + /** Brute force search for buckets. Bucket i is defined by the range [split(i), split(i+1)). */ + def linearSearchForBuckets(splits: Array[Double], feature: Double): Double = { + require(feature >= splits.head) var i = 0 - while (i < splits.size) { - if (feature < splits(i)) return i + while (i < splits.length - 1) { + if (feature < splits(i + 1)) return i i += 1 } - i + throw new RuntimeException( + s"linearSearchForBuckets failed to find bucket for feature value $feature") + } + + /** Check all values in splits, plus values between all splits. */ + def checkBinarySearch(splits: Array[Double]): Unit = { + def testFeature(feature: Double, expectedBucket: Double): Unit = { + assert(Bucketizer.binarySearchForBuckets(splits, feature) === expectedBucket, + s"Expected feature value $feature to be in bucket $expectedBucket with splits:" + + s" ${splits.mkString(", ")}") + } + var i = 0 + while (i < splits.length - 1) { + testFeature(splits(i), i) // Split i should fall in bucket i. + testFeature((splits(i) + splits(i + 1)) / 2, i) // Value between splits i,i+1 should be in i. + i += 1 + } + if (splits.last === Double.PositiveInfinity) { + testFeature(Double.PositiveInfinity, splits.length - 2) + } } } From 1ca973a5e884944c967717d5117e36cf6c6832e4 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 11 May 2015 12:44:24 -0700 Subject: [PATCH 12/12] one more bucketizer test --- .../scala/org/apache/spark/ml/feature/BucketizerSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index 77b0b75391201..acb46c0a35709 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -101,6 +101,8 @@ class BucketizerSuite extends FunSuite with MLlibTestSparkContext { checkBinarySearch(Array(0.0, 1.0, Double.PositiveInfinity)) // length 3, with -inf and inf checkBinarySearch(Array(Double.NegativeInfinity, 1.0, Double.PositiveInfinity)) + // length 4, with -inf and inf + checkBinarySearch(Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity)) } test("Binary search correctness in contrast with linear search, on random data") {