From f6be730d39c0cfc2173e11ae2e0832e97b137274 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 29 Apr 2015 22:02:03 +0800 Subject: [PATCH 01/12] add feature discretizer --- .../spark/ml/feature/FeatureDiscretizer.scala | 161 ++++++++++++++++++ .../ml/feature/FeatureDiscretizerSuite.scala | 53 ++++++ 2 files changed, 214 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/feature/FeatureDiscretizer.scala create mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/FeatureDiscretizerSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureDiscretizer.scala new file mode 100644 index 0000000000000..d6e4f89dc49ec --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureDiscretizer.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import scala.collection.mutable + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.attribute.NominalAttribute +import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} +import org.apache.spark.ml.param.{IntParam, ParamMap} +import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{DoubleType, StructType} +import org.apache.spark.util.random.XORShiftRandom + +/** + * :: AlphaComponent :: + * `FeatureDiscretizer` takes a column with continuous features and outputs a column with binned + * categorical features. + */ +@AlphaComponent +class FeatureDiscretizer extends Transformer with HasInputCol with HasOutputCol { + + /** + * Number of bins to collect data points, which should be a positive integer. + * @group param + */ + val numBins = new IntParam(this, "numBins", + "Number of bins to collect data points, which should be a positive integer.") + setDefault(numBins -> 1) + + /** @group getParam */ + def getNumBins: Int = getOrDefault(numBins) + + /** @group setParam */ + def setNumBins(value: Int): this.type = set(numBins, value) + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + val map = extractParamMap(paramMap) + assert(map(numBins) >= 1, "Number of bins should be a positive integer.") + SchemaUtils.checkColumnType(schema, map(inputCol), DoubleType) + val inputFields = schema.fields + val outputColName = map(outputCol) + require(inputFields.forall(_.name != outputColName), + s"Output column $outputColName already exists.") + val attr = NominalAttribute.defaultAttr.withName(outputColName) + val outputFields = inputFields :+ attr.toStructField() + StructType(outputFields) + } + + override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { + transformSchema(dataset.schema, paramMap) + val map = extractParamMap(paramMap) + val input = dataset.select(map(inputCol)).map { case Row(feature: Double) => feature } + val samples = getSampledInput(input, map(numBins)) + val splits = findSplits(samples, map(numBins) - 1) + val discretizer = udf { feature: Double => binarySearchForBins(splits, feature) } + val outputColName = map(outputCol) + val metadata = NominalAttribute.defaultAttr + .withName(outputColName).withValues(splits.map(_.toString)).toMetadata() + dataset.select(col("*"), + discretizer(dataset(map(inputCol))).as(outputColName, metadata)) + } + + /** + * Binary searching in several bins to place each data point. + */ + private def binarySearchForBins(splits: Array[Double], feature: Double): Double = { + val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) + var left = 0 + var right = wrappedSplits.length - 2 + while (left <= right) { + val mid = left + (right - left) / 2 + val split = wrappedSplits(mid) + if ((feature > split) && (feature <= wrappedSplits(mid + 1))) { + return mid + } else if (feature <= split) { + right = mid - 1 + } else { + left = mid + 1 + } + } + -1 + } + + /** + * Sampling from the given dataset to collect quantile statistics. + */ + private def getSampledInput(dataset: RDD[Double], numBins: Int): Array[Double] = { + val totalSamples = dataset.count() + assert(totalSamples > 0) + val requiredSamples = math.max(numBins * numBins, 10000) + val fraction = math.min(requiredSamples / dataset.count(), 1.0) + dataset.sample(withReplacement = false, fraction, new XORShiftRandom().nextInt()).collect() + } + + /** + * Compute split points with respect to the sample distribution. + */ + private def findSplits(samples: Array[Double], numSplits: Int): Array[Double] = { + val valueCountMap = samples.foldLeft(Map.empty[Double, Int]) { (m, x) => + m + ((x, m.getOrElse(x, 0) + 1)) + } + val valueCounts = valueCountMap.toSeq.sortBy(_._1).toArray + val possibleSplits = valueCounts.length + if (possibleSplits <= numSplits) { + valueCounts.map(_._1) + } else { + val stride: Double = samples.length.toDouble / (numSplits + 1) + val splitsBuilder = mutable.ArrayBuilder.make[Double] + var index = 1 + // currentCount: sum of counts of values that have been visited + var currentCount = valueCounts(0)._2 + // targetCount: target value for `currentCount`. + // If `currentCount` is closest value to `targetCount`, + // then current value is a split threshold. + // After finding a split threshold, `targetCount` is added by stride. + var targetCount = stride + while (index < valueCounts.length) { + val previousCount = currentCount + currentCount += valueCounts(index)._2 + val previousGap = math.abs(previousCount - targetCount) + val currentGap = math.abs(currentCount - targetCount) + // If adding count of current value to currentCount + // makes the gap between currentCount and targetCount smaller, + // previous value is a split threshold. + if (previousGap < currentGap) { + splitsBuilder += valueCounts(index - 1)._1 + targetCount += stride + } + index += 1 + } + splitsBuilder.result() + } + } +} + diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureDiscretizerSuite.scala new file mode 100644 index 0000000000000..fe6bc8e9b8487 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureDiscretizerSuite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.ml.attribute.{Attribute, NominalAttribute} +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.{Row, SQLContext} + +class FeatureDiscretizerSuite extends FunSuite with MLlibTestSparkContext { + + test("Test feature discretizer") { + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + val random = new Random(47) + val data = Array.fill[Double](10)(random.nextDouble()) + val result = Array[Double](2, 1, 0, 0, 1, 1, 1, 0, 2, 2) + + val df = sc.parallelize(data.zip(result)).toDF("data", "expected") + + val discretizer = new FeatureDiscretizer() + .setInputCol("data") + .setOutputCol("result") + .setNumBins(3) + + val res = discretizer.transform(df) + res.select("expected", "result").collect().foreach { + case Row(expected: Double, result: Double) => assert(expected == result) + } + + val attr = Attribute.fromStructField(res.schema("result")).asInstanceOf[NominalAttribute] + assert(attr.values.get === Array("0.18847866977771732","0.5309454508634242")) + } +} From 91e6802be8e1a0f766e0dd5f85c80f010d856c8a Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 7 May 2015 15:45:38 +0800 Subject: [PATCH 02/12] refactor it into an estimator --- ...etizer.scala => QuantileDiscretizer.scala} | 150 +++++++++++------- ...e.scala => QuantileDiscretizerSuite.scala} | 11 +- 2 files changed, 103 insertions(+), 58 deletions(-) rename mllib/src/main/scala/org/apache/spark/ml/feature/{FeatureDiscretizer.scala => QuantileDiscretizer.scala} (71%) rename mllib/src/test/scala/org/apache/spark/ml/feature/{FeatureDiscretizerSuite.scala => QuantileDiscretizerSuite.scala} (86%) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala similarity index 71% rename from mllib/src/main/scala/org/apache/spark/ml/feature/FeatureDiscretizer.scala rename to mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index d6e4f89dc49ec..cf43be9a71d64 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -20,38 +20,59 @@ package org.apache.spark.ml.feature import scala.collection.mutable import org.apache.spark.annotation.AlphaComponent -import org.apache.spark.ml.Transformer +import org.apache.spark.ml._ import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} -import org.apache.spark.ml.param.{IntParam, ParamMap} +import org.apache.spark.ml.param.{IntParam, ParamMap, _} import org.apache.spark.ml.util.SchemaUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, StructType} +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.util.random.XORShiftRandom /** - * :: AlphaComponent :: - * `FeatureDiscretizer` takes a column with continuous features and outputs a column with binned - * categorical features. + * Params for [[QuantileDiscretizer]] and [[Bucketizer]]. */ -@AlphaComponent -class FeatureDiscretizer extends Transformer with HasInputCol with HasOutputCol { +private[feature] trait QuantileDiscretizerBase extends Params with HasInputCol with HasOutputCol { /** - * Number of bins to collect data points, which should be a positive integer. + * Number of buckets to collect data points, which should be a positive integer. * @group param */ - val numBins = new IntParam(this, "numBins", - "Number of bins to collect data points, which should be a positive integer.") - setDefault(numBins -> 1) + val numBuckets = new IntParam(this, "numBuckets", + "Number of buckets to collect data points, which should be a positive integer.") + setDefault(numBuckets -> 1) /** @group getParam */ - def getNumBins: Int = getOrDefault(numBins) + def getNumBuckets: Int = getOrDefault(numBuckets) + + /** + * Validate and transform the input schema. + */ + protected def validateAndTransformSchema(schema: StructType, paramMap: ParamMap): StructType = { + val map = extractParamMap(paramMap) + SchemaUtils.checkColumnType(schema, map(inputCol), DoubleType) + val inputFields = schema.fields + val outputColName = map(outputCol) + require(inputFields.forall(_.name != outputColName), + s"Output column $outputColName already exists.") + val attr = NominalAttribute.defaultAttr.withName(outputColName) + val outputFields = inputFields :+ attr.toStructField() + StructType(outputFields) + } +} + +/** + * :: AlphaComponent :: + * `QuantileDiscretizer` takes a column with continuous features and outputs a column with binned + * categorical features. + */ +@AlphaComponent +final class QuantileDiscretizer extends Estimator[Bucketizer] with QuantileDiscretizerBase { /** @group setParam */ - def setNumBins(value: Int): this.type = set(numBins, value) + def setNumBuckets(value: Int): this.type = set(numBuckets, value) /** @group setParam */ def setInputCol(value: String): this.type = set(inputCol, value) @@ -61,50 +82,19 @@ class FeatureDiscretizer extends Transformer with HasInputCol with HasOutputCol override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { val map = extractParamMap(paramMap) - assert(map(numBins) >= 1, "Number of bins should be a positive integer.") - SchemaUtils.checkColumnType(schema, map(inputCol), DoubleType) - val inputFields = schema.fields - val outputColName = map(outputCol) - require(inputFields.forall(_.name != outputColName), - s"Output column $outputColName already exists.") - val attr = NominalAttribute.defaultAttr.withName(outputColName) - val outputFields = inputFields :+ attr.toStructField() - StructType(outputFields) + assert(map(numBuckets) >= 1, "Number of bins should be a positive integer.") + validateAndTransformSchema(schema, paramMap) } - override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { + override def fit(dataset: DataFrame, paramMap: ParamMap): Bucketizer = { transformSchema(dataset.schema, paramMap) val map = extractParamMap(paramMap) val input = dataset.select(map(inputCol)).map { case Row(feature: Double) => feature } - val samples = getSampledInput(input, map(numBins)) - val splits = findSplits(samples, map(numBins) - 1) - val discretizer = udf { feature: Double => binarySearchForBins(splits, feature) } - val outputColName = map(outputCol) - val metadata = NominalAttribute.defaultAttr - .withName(outputColName).withValues(splits.map(_.toString)).toMetadata() - dataset.select(col("*"), - discretizer(dataset(map(inputCol))).as(outputColName, metadata)) - } - - /** - * Binary searching in several bins to place each data point. - */ - private def binarySearchForBins(splits: Array[Double], feature: Double): Double = { - val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) - var left = 0 - var right = wrappedSplits.length - 2 - while (left <= right) { - val mid = left + (right - left) / 2 - val split = wrappedSplits(mid) - if ((feature > split) && (feature <= wrappedSplits(mid + 1))) { - return mid - } else if (feature <= split) { - right = mid - 1 - } else { - left = mid + 1 - } - } - -1 + val samples = getSampledInput(input, map(numBuckets)) + val splits = findSplits(samples, map(numBuckets) - 1) + val bucketizer = new Bucketizer(this, map, splits) + Params.inheritValues(map, this, bucketizer) + bucketizer } /** @@ -159,3 +149,57 @@ class FeatureDiscretizer extends Transformer with HasInputCol with HasOutputCol } } +/** + * :: AlphaComponent :: + * Model fitted by [[QuantileDiscretizer]]. + */ +@AlphaComponent +class Bucketizer private[ml] ( + override val parent: QuantileDiscretizer, + override val fittingParamMap: ParamMap, + splits: Array[Double]) + extends Model[Bucketizer] with QuantileDiscretizerBase { + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { + transformSchema(dataset.schema, paramMap, logging = true) + val map = extractParamMap(paramMap) + val discretizer = udf { feature: Double => binarySearchForBins(splits, feature) } + val outputColName = map(outputCol) + val metadata = NominalAttribute.defaultAttr + .withName(outputColName).withValues(splits.map(_.toString)).toMetadata() + dataset.select(col("*"), + discretizer(dataset(map(inputCol))).as(outputColName, metadata)) + } + + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + validateAndTransformSchema(schema, paramMap) + } + + /** + * Binary searching in several bins to place each data point. + */ + private def binarySearchForBins(splits: Array[Double], feature: Double): Double = { + val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) + var left = 0 + var right = wrappedSplits.length - 2 + while (left <= right) { + val mid = left + (right - left) / 2 + val split = wrappedSplits(mid) + if ((feature > split) && (feature <= wrappedSplits(mid + 1))) { + return mid + } else if (feature <= split) { + right = mid - 1 + } else { + left = mid + 1 + } + } + -1 + } +} + diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala similarity index 86% rename from mllib/src/test/scala/org/apache/spark/ml/feature/FeatureDiscretizerSuite.scala rename to mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index fe6bc8e9b8487..938e5ef7ef469 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -25,9 +25,9 @@ import org.apache.spark.ml.attribute.{Attribute, NominalAttribute} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{Row, SQLContext} -class FeatureDiscretizerSuite extends FunSuite with MLlibTestSparkContext { +class QuantileDiscretizerSuite extends FunSuite with MLlibTestSparkContext { - test("Test feature discretizer") { + test("Test quantile discretizer") { val sqlContext = new SQLContext(sc) import sqlContext.implicits._ @@ -37,12 +37,13 @@ class FeatureDiscretizerSuite extends FunSuite with MLlibTestSparkContext { val df = sc.parallelize(data.zip(result)).toDF("data", "expected") - val discretizer = new FeatureDiscretizer() + val discretizer = new QuantileDiscretizer() .setInputCol("data") .setOutputCol("result") - .setNumBins(3) + .setNumBuckets(3) - val res = discretizer.transform(df) + val bucketizer = discretizer.fit(df) + val res = bucketizer.transform(df) res.select("expected", "result").collect().foreach { case Row(expected: Double, result: Double) => assert(expected == result) } From 38f73f28b73f4bbd371b901b864edd4cacd29642 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 7 May 2015 22:45:25 +0800 Subject: [PATCH 03/12] take Bucketizer as model --- .../ml/feature/QuantileDiscretizer.scala | 106 ++++-------------- 1 file changed, 19 insertions(+), 87 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index cf43be9a71d64..538bd56f5e750 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -23,16 +23,15 @@ import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml._ import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} -import org.apache.spark.ml.param.{IntParam, ParamMap, _} +import org.apache.spark.ml.param.{IntParam, _} import org.apache.spark.ml.util.SchemaUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, StructType} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.util.random.XORShiftRandom /** - * Params for [[QuantileDiscretizer]] and [[Bucketizer]]. + * Params for [[QuantileDiscretizer]]. */ private[feature] trait QuantileDiscretizerBase extends Params with HasInputCol with HasOutputCol { @@ -41,26 +40,12 @@ private[feature] trait QuantileDiscretizerBase extends Params with HasInputCol w * @group param */ val numBuckets = new IntParam(this, "numBuckets", - "Number of buckets to collect data points, which should be a positive integer.") - setDefault(numBuckets -> 1) + "Number of buckets to collect data points, which should be a positive integer.", + ParamValidators.gtEq(2)) + setDefault(numBuckets -> 2) /** @group getParam */ def getNumBuckets: Int = getOrDefault(numBuckets) - - /** - * Validate and transform the input schema. - */ - protected def validateAndTransformSchema(schema: StructType, paramMap: ParamMap): StructType = { - val map = extractParamMap(paramMap) - SchemaUtils.checkColumnType(schema, map(inputCol), DoubleType) - val inputFields = schema.fields - val outputColName = map(outputCol) - require(inputFields.forall(_.name != outputColName), - s"Output column $outputColName already exists.") - val attr = NominalAttribute.defaultAttr.withName(outputColName) - val outputFields = inputFields :+ attr.toStructField() - StructType(outputFields) - } } /** @@ -80,21 +65,22 @@ final class QuantileDiscretizer extends Estimator[Bucketizer] with QuantileDiscr /** @group setParam */ def setOutputCol(value: String): this.type = set(outputCol, value) - override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { - val map = extractParamMap(paramMap) - assert(map(numBuckets) >= 1, "Number of bins should be a positive integer.") - validateAndTransformSchema(schema, paramMap) + override def transformSchema(schema: StructType): StructType = { + SchemaUtils.checkColumnType(schema, $(inputCol), DoubleType) + val inputFields = schema.fields + require(inputFields.forall(_.name != $(outputCol)), + s"Output column ${$(outputCol)} already exists.") + val attr = NominalAttribute.defaultAttr.withName($(outputCol)) + val outputFields = inputFields :+ attr.toStructField() + StructType(outputFields) } - override def fit(dataset: DataFrame, paramMap: ParamMap): Bucketizer = { - transformSchema(dataset.schema, paramMap) - val map = extractParamMap(paramMap) - val input = dataset.select(map(inputCol)).map { case Row(feature: Double) => feature } - val samples = getSampledInput(input, map(numBuckets)) - val splits = findSplits(samples, map(numBuckets) - 1) - val bucketizer = new Bucketizer(this, map, splits) - Params.inheritValues(map, this, bucketizer) - bucketizer + override def fit(dataset: DataFrame): Bucketizer = { + val input = dataset.select($(inputCol)).map { case Row(feature: Double) => feature } + val samples = getSampledInput(input, $(numBuckets)) + val splits = findSplits(samples, $(numBuckets) - 1) + val bucketizer = new Bucketizer(this).setBuckets(splits) + copyValues(bucketizer) } /** @@ -149,57 +135,3 @@ final class QuantileDiscretizer extends Estimator[Bucketizer] with QuantileDiscr } } -/** - * :: AlphaComponent :: - * Model fitted by [[QuantileDiscretizer]]. - */ -@AlphaComponent -class Bucketizer private[ml] ( - override val parent: QuantileDiscretizer, - override val fittingParamMap: ParamMap, - splits: Array[Double]) - extends Model[Bucketizer] with QuantileDiscretizerBase { - - /** @group setParam */ - def setInputCol(value: String): this.type = set(inputCol, value) - - /** @group setParam */ - def setOutputCol(value: String): this.type = set(outputCol, value) - - override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { - transformSchema(dataset.schema, paramMap, logging = true) - val map = extractParamMap(paramMap) - val discretizer = udf { feature: Double => binarySearchForBins(splits, feature) } - val outputColName = map(outputCol) - val metadata = NominalAttribute.defaultAttr - .withName(outputColName).withValues(splits.map(_.toString)).toMetadata() - dataset.select(col("*"), - discretizer(dataset(map(inputCol))).as(outputColName, metadata)) - } - - override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { - validateAndTransformSchema(schema, paramMap) - } - - /** - * Binary searching in several bins to place each data point. - */ - private def binarySearchForBins(splits: Array[Double], feature: Double): Double = { - val wrappedSplits = Array(Double.MinValue) ++ splits ++ Array(Double.MaxValue) - var left = 0 - var right = wrappedSplits.length - 2 - while (left <= right) { - val mid = left + (right - left) / 2 - val split = wrappedSplits(mid) - if ((feature > split) && (feature <= wrappedSplits(mid + 1))) { - return mid - } else if (feature <= split) { - right = mid - 1 - } else { - left = mid + 1 - } - } - -1 - } -} - From 5ffa1676e809942c9c148dcca4ac990898d6d141 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 15 May 2015 16:57:14 +0800 Subject: [PATCH 04/12] merge with Bucketizer --- .../spark/ml/feature/QuantileDiscretizer.scala | 12 ++++++++---- .../spark/ml/feature/QuantileDiscretizerSuite.scala | 8 ++++++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 538bd56f5e750..6f5fb0df7009a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -24,7 +24,7 @@ import org.apache.spark.ml._ import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.param.{IntParam, _} -import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.ml.util._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DoubleType, StructType} import org.apache.spark.sql.{DataFrame, Row} @@ -54,7 +54,10 @@ private[feature] trait QuantileDiscretizerBase extends Params with HasInputCol w * categorical features. */ @AlphaComponent -final class QuantileDiscretizer extends Estimator[Bucketizer] with QuantileDiscretizerBase { +final class QuantileDiscretizer(override val uid: String) + extends Estimator[Bucketizer] with QuantileDiscretizerBase { + + def this() = this(Identifiable.randomUID("QuantileDiscretizer")) /** @group setParam */ def setNumBuckets(value: Int): this.type = set(numBuckets, value) @@ -78,8 +81,9 @@ final class QuantileDiscretizer extends Estimator[Bucketizer] with QuantileDiscr override def fit(dataset: DataFrame): Bucketizer = { val input = dataset.select($(inputCol)).map { case Row(feature: Double) => feature } val samples = getSampledInput(input, $(numBuckets)) - val splits = findSplits(samples, $(numBuckets) - 1) - val bucketizer = new Bucketizer(this).setBuckets(splits) + val splits = Array(Double.NegativeInfinity) ++ findSplits(samples, $(numBuckets) - 1) ++ + Array(Double.PositiveInfinity) + val bucketizer = new Bucketizer(uid).setSplits(splits) copyValues(bucketizer) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index 938e5ef7ef469..f0d67b64154e4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -33,7 +33,7 @@ class QuantileDiscretizerSuite extends FunSuite with MLlibTestSparkContext { val random = new Random(47) val data = Array.fill[Double](10)(random.nextDouble()) - val result = Array[Double](2, 1, 0, 0, 1, 1, 1, 0, 2, 2) + val result = Array[Double](2, 2, 0, 1, 1, 1, 1, 0, 2, 2) val df = sc.parallelize(data.zip(result)).toDF("data", "expected") @@ -44,11 +44,15 @@ class QuantileDiscretizerSuite extends FunSuite with MLlibTestSparkContext { val bucketizer = discretizer.fit(df) val res = bucketizer.transform(df) + res.select("expected", "result").collect().foreach { case Row(expected: Double, result: Double) => assert(expected == result) } val attr = Attribute.fromStructField(res.schema("result")).asInstanceOf[NominalAttribute] - assert(attr.values.get === Array("0.18847866977771732","0.5309454508634242")) + assert(attr.values.get === Array( + "-Infinity, 0.18847866977771732", + "0.18847866977771732, 0.5309454508634242", + "0.5309454508634242, Infinity")) } } From 74f1b6576d2c760d31e084694c417d36c138b723 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 22 Sep 2015 20:17:54 -0700 Subject: [PATCH 05/12] fix find splits --- .../ml/feature/QuantileDiscretizer.scala | 61 ++++++++++++------- .../ml/feature/QuantileDiscretizerSuite.scala | 7 +++ 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 6f5fb0df7009a..2e90b75803369 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -25,7 +25,6 @@ import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.param.{IntParam, _} import org.apache.spark.ml.util._ -import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DoubleType, StructType} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.util.random.XORShiftRandom @@ -36,11 +35,12 @@ import org.apache.spark.util.random.XORShiftRandom private[feature] trait QuantileDiscretizerBase extends Params with HasInputCol with HasOutputCol { /** - * Number of buckets to collect data points, which should be a positive integer. + * Maximum number of buckets (quantiles, or categories) into which data points are grouped. Must + * be >= 2. * @group param */ - val numBuckets = new IntParam(this, "numBuckets", - "Number of buckets to collect data points, which should be a positive integer.", + val numBuckets = new IntParam(this, "numBuckets", "Maximum number of buckets (quantiles, or " + + "categories) into which data points are grouped. Must be >= 2.", ParamValidators.gtEq(2)) setDefault(numBuckets -> 2) @@ -51,13 +51,16 @@ private[feature] trait QuantileDiscretizerBase extends Params with HasInputCol w /** * :: AlphaComponent :: * `QuantileDiscretizer` takes a column with continuous features and outputs a column with binned - * categorical features. + * categorical features. The bin ranges are chosen by taking a sample of the data and dividing it + * into roughly equal parts. The lower and upper bin bounds will be -Infinity and +Infinity, + * covering all real values. This attempts to find numBuckets partitions based on a sample of data, + * but it may find fewer depending on the data sample values. */ @AlphaComponent final class QuantileDiscretizer(override val uid: String) extends Estimator[Bucketizer] with QuantileDiscretizerBase { - def this() = this(Identifiable.randomUID("QuantileDiscretizer")) + def this() = this(Identifiable.randomUID("quantileDiscretizer")) /** @group setParam */ def setNumBuckets(value: Int): this.type = set(numBuckets, value) @@ -79,18 +82,34 @@ final class QuantileDiscretizer(override val uid: String) } override def fit(dataset: DataFrame): Bucketizer = { - val input = dataset.select($(inputCol)).map { case Row(feature: Double) => feature } - val samples = getSampledInput(input, $(numBuckets)) - val splits = Array(Double.NegativeInfinity) ++ findSplits(samples, $(numBuckets) - 1) ++ - Array(Double.PositiveInfinity) + val samples = QuantileDiscretizer.getSampledInput(dataset.select($(inputCol)), $(numBuckets)) + .map { case Row(feature: Double) => feature } + val splitCandidates = QuantileDiscretizer.findSplits(samples, $(numBuckets) - 1) + val splits = if (splitCandidates.size == 0) { + logInfo("Failed to find any suitable splits, using 0 as default split point.") + Array(Double.NegativeInfinity, 0, Double.PositiveInfinity) + } else { + if (splitCandidates.head == Double.NegativeInfinity + && splitCandidates.last == Double.PositiveInfinity) { + splitCandidates + } else if (splitCandidates.head == Double.NegativeInfinity) { + splitCandidates ++ Array(Double.PositiveInfinity) + } else if (splitCandidates.last == Double.PositiveInfinity) { + Array(Double.NegativeInfinity) ++ splitCandidates + } else { + Array(Double.NegativeInfinity) ++ splitCandidates ++ Array(Double.PositiveInfinity) + } + } val bucketizer = new Bucketizer(uid).setSplits(splits) copyValues(bucketizer) } +} +private[feature] object QuantileDiscretizer { /** * Sampling from the given dataset to collect quantile statistics. */ - private def getSampledInput(dataset: RDD[Double], numBins: Int): Array[Double] = { + def getSampledInput(dataset: DataFrame, numBins: Int): Array[Row] = { val totalSamples = dataset.count() assert(totalSamples > 0) val requiredSamples = math.max(numBins * numBins, 10000) @@ -101,33 +120,31 @@ final class QuantileDiscretizer(override val uid: String) /** * Compute split points with respect to the sample distribution. */ - private def findSplits(samples: Array[Double], numSplits: Int): Array[Double] = { + def findSplits(samples: Array[Double], numSplits: Int): Array[Double] = { val valueCountMap = samples.foldLeft(Map.empty[Double, Int]) { (m, x) => m + ((x, m.getOrElse(x, 0) + 1)) } - val valueCounts = valueCountMap.toSeq.sortBy(_._1).toArray - val possibleSplits = valueCounts.length + val valueCounts = valueCountMap.toSeq.sortBy(_._1).toArray ++ Array((Double.MaxValue, 1)) + val possibleSplits = valueCounts.length - 1 if (possibleSplits <= numSplits) { - valueCounts.map(_._1) + valueCounts.dropRight(1).map(_._1) } else { val stride: Double = samples.length.toDouble / (numSplits + 1) val splitsBuilder = mutable.ArrayBuilder.make[Double] var index = 1 // currentCount: sum of counts of values that have been visited var currentCount = valueCounts(0)._2 - // targetCount: target value for `currentCount`. - // If `currentCount` is closest value to `targetCount`, - // then current value is a split threshold. - // After finding a split threshold, `targetCount` is added by stride. + // targetCount: target value for `currentCount`. If `currentCount` is closest value to + // `targetCount`, then current value is a split threshold. After finding a split threshold, + // `targetCount` is added by stride. var targetCount = stride while (index < valueCounts.length) { val previousCount = currentCount currentCount += valueCounts(index)._2 val previousGap = math.abs(previousCount - targetCount) val currentGap = math.abs(currentCount - targetCount) - // If adding count of current value to currentCount - // makes the gap between currentCount and targetCount smaller, - // previous value is a split threshold. + // If adding count of current value to currentCount makes the gap between currentCount and + // targetCount smaller, previous value is a split threshold. if (previousGap < currentGap) { splitsBuilder += valueCounts(index - 1)._1 targetCount += stride diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index f0d67b64154e4..2bd5adc9b742f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -55,4 +55,11 @@ class QuantileDiscretizerSuite extends FunSuite with MLlibTestSparkContext { "0.18847866977771732, 0.5309454508634242", "0.5309454508634242, Infinity")) } + + test("Test find splits") { + val sample = Array[Double](1, 2, 3, 3, 3, 3, 3, 3, 3) + val numSplits = 2 + val res = QuantileDiscretizer.findSplits(sample, numSplits) + println(res.mkString(", ")) + } } From b2bd98f81590c690e02007a3aaa42afa6832aa90 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 22 Sep 2015 21:30:29 -0700 Subject: [PATCH 06/12] add more tests --- .../ml/feature/QuantileDiscretizer.scala | 2 +- .../ml/feature/QuantileDiscretizerSuite.scala | 75 ++++++++++++------- 2 files changed, 48 insertions(+), 29 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 2e90b75803369..0169683b86d2e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -129,7 +129,7 @@ private[feature] object QuantileDiscretizer { if (possibleSplits <= numSplits) { valueCounts.dropRight(1).map(_._1) } else { - val stride: Double = samples.length.toDouble / (numSplits + 1) + val stride: Double = math.ceil(samples.length.toDouble / (numSplits + 1)) val splitsBuilder = mutable.ArrayBuilder.make[Double] var index = 1 // currentCount: sum of counts of values that have been visited diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index 2bd5adc9b742f..87e8b517c8443 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -17,49 +17,68 @@ package org.apache.spark.ml.feature -import scala.util.Random - import org.scalatest.FunSuite +import org.apache.spark.SparkContext import org.apache.spark.ml.attribute.{Attribute, NominalAttribute} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{Row, SQLContext} class QuantileDiscretizerSuite extends FunSuite with MLlibTestSparkContext { + import org.apache.spark.ml.feature.QuantileDiscretizerSuite._ test("Test quantile discretizer") { - val sqlContext = new SQLContext(sc) - import sqlContext.implicits._ + checkDiscretizedData(sc, + Array[Double](1, 2, 3, 3, 3, 3, 3, 3, 3), + 10, + Array[Double](1, 2, 3, 3, 3, 3, 3, 3, 3), + Array("-Infinity, 1.0", "1.0, 2.0", "2.0, 3.0", "3.0, Infinity")) + + checkDiscretizedData(sc, + Array[Double](1, 2, 3, 3, 3, 3, 3, 3, 3), + 4, + Array[Double](1, 2, 3, 3, 3, 3, 3, 3, 3), + Array("-Infinity, 1.0", "1.0, 2.0", "2.0, 3.0", "3.0, Infinity")) + + checkDiscretizedData(sc, + Array[Double](1, 2, 3, 3, 3, 3, 3, 3, 3), + 3, + Array[Double](0, 1, 2, 2, 2, 2, 2, 2, 2), + Array("-Infinity, 2.0", "2.0, 3.0", "3.0, Infinity")) - val random = new Random(47) - val data = Array.fill[Double](10)(random.nextDouble()) - val result = Array[Double](2, 2, 0, 1, 1, 1, 1, 0, 2, 2) + checkDiscretizedData(sc, + Array[Double](1, 2, 3, 3, 3, 3, 3, 3, 3), + 2, + Array[Double](0, 1, 1, 1, 1, 1, 1, 1, 1), + Array("-Infinity, 2.0", "2.0, Infinity")) - val df = sc.parallelize(data.zip(result)).toDF("data", "expected") + } +} - val discretizer = new QuantileDiscretizer() - .setInputCol("data") - .setOutputCol("result") - .setNumBuckets(3) +private object QuantileDiscretizerSuite extends FunSuite { - val bucketizer = discretizer.fit(df) - val res = bucketizer.transform(df) + def checkDiscretizedData( + sc: SparkContext, + data: Array[Double], + numBucket: Int, + expectedResult: Array[Double], + expectedAttrs: Array[String]): Unit = { + val sqlCtx = new SQLContext(sc) + import sqlCtx.implicits._ - res.select("expected", "result").collect().foreach { - case Row(expected: Double, result: Double) => assert(expected == result) - } + val df = sc.parallelize(data.map(Tuple1.apply)).toDF("input") + val discretizer = new QuantileDiscretizer().setInputCol("input").setOutputCol("result") + .setNumBuckets(numBucket) + val result = discretizer.fit(df).transform(df) - val attr = Attribute.fromStructField(res.schema("result")).asInstanceOf[NominalAttribute] - assert(attr.values.get === Array( - "-Infinity, 0.18847866977771732", - "0.18847866977771732, 0.5309454508634242", - "0.5309454508634242, Infinity")) - } + val transformedFeatures = result.select("result").collect() + .map { case Row(transformedFeature: Double) => transformedFeature } + val transformedAttrs = Attribute.fromStructField(result.schema("result")) + .asInstanceOf[NominalAttribute].values.get - test("Test find splits") { - val sample = Array[Double](1, 2, 3, 3, 3, 3, 3, 3, 3) - val numSplits = 2 - val res = QuantileDiscretizer.findSplits(sample, numSplits) - println(res.mkString(", ")) + assert(transformedFeatures === expectedResult, + "Transformed features do not equal expected features.") + assert(transformedAttrs === expectedAttrs, + "Transformed attributes do not equal expected attributes.") } } From 7fadccdc91e8b772a2600d099bed84f58a3196d0 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 22 Sep 2015 21:47:15 -0700 Subject: [PATCH 07/12] merge with current master --- .../org/apache/spark/ml/feature/QuantileDiscretizer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 0169683b86d2e..b3e58cf7eb1eb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -49,7 +49,7 @@ private[feature] trait QuantileDiscretizerBase extends Params with HasInputCol w } /** - * :: AlphaComponent :: + * :: Experimental :: * `QuantileDiscretizer` takes a column with continuous features and outputs a column with binned * categorical features. The bin ranges are chosen by taking a sample of the data and dividing it * into roughly equal parts. The lower and upper bin bounds will be -Infinity and +Infinity, @@ -103,6 +103,8 @@ final class QuantileDiscretizer(override val uid: String) val bucketizer = new Bucketizer(uid).setSplits(splits) copyValues(bucketizer) } + + override def copy(extra: ParamMap): QuantileDiscretizer = defaultCopy(extra) } private[feature] object QuantileDiscretizer { From 99d46eec48d7b734227da87626829730db410508 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 22 Sep 2015 21:55:35 -0700 Subject: [PATCH 08/12] fix minor error --- .../org/apache/spark/ml/feature/QuantileDiscretizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index b3e58cf7eb1eb..b7e85220c8b44 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.feature import scala.collection.mutable -import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.annotation.Experimental import org.apache.spark.ml._ import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} @@ -56,7 +56,7 @@ private[feature] trait QuantileDiscretizerBase extends Params with HasInputCol w * covering all real values. This attempts to find numBuckets partitions based on a sample of data, * but it may find fewer depending on the data sample values. */ -@AlphaComponent +@Experimental final class QuantileDiscretizer(override val uid: String) extends Estimator[Bucketizer] with QuantileDiscretizerBase { From 954d7b928ef289d0b48956ea3b52ff49ec014dbc Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Tue, 22 Sep 2015 22:34:03 -0700 Subject: [PATCH 09/12] fix style error --- .../spark/ml/feature/QuantileDiscretizerSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index 87e8b517c8443..e02427f39a966 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -17,14 +17,12 @@ package org.apache.spark.ml.feature -import org.scalatest.FunSuite - -import org.apache.spark.SparkContext import org.apache.spark.ml.attribute.{Attribute, NominalAttribute} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.{SparkContext, SparkFunSuite} -class QuantileDiscretizerSuite extends FunSuite with MLlibTestSparkContext { +class QuantileDiscretizerSuite extends SparkFunSuite with MLlibTestSparkContext { import org.apache.spark.ml.feature.QuantileDiscretizerSuite._ test("Test quantile discretizer") { @@ -55,7 +53,7 @@ class QuantileDiscretizerSuite extends FunSuite with MLlibTestSparkContext { } } -private object QuantileDiscretizerSuite extends FunSuite { +private object QuantileDiscretizerSuite extends SparkFunSuite { def checkDiscretizedData( sc: SparkContext, From b5d90e7757d8e725a05ef95ad604938aa80e393f Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 24 Sep 2015 13:15:17 -0700 Subject: [PATCH 10/12] minor changes --- .../ml/feature/QuantileDiscretizer.scala | 56 ++++++++++++------- .../ml/feature/QuantileDiscretizerSuite.scala | 2 +- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index b7e85220c8b44..f0be45674173d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.feature +import org.apache.spark.Logging + import scala.collection.mutable import org.apache.spark.annotation.Experimental @@ -37,6 +39,7 @@ private[feature] trait QuantileDiscretizerBase extends Params with HasInputCol w /** * Maximum number of buckets (quantiles, or categories) into which data points are grouped. Must * be >= 2. + * default: 2 * @group param */ val numBuckets = new IntParam(this, "numBuckets", "Maximum number of buckets (quantiles, or " + @@ -84,22 +87,8 @@ final class QuantileDiscretizer(override val uid: String) override def fit(dataset: DataFrame): Bucketizer = { val samples = QuantileDiscretizer.getSampledInput(dataset.select($(inputCol)), $(numBuckets)) .map { case Row(feature: Double) => feature } - val splitCandidates = QuantileDiscretizer.findSplits(samples, $(numBuckets) - 1) - val splits = if (splitCandidates.size == 0) { - logInfo("Failed to find any suitable splits, using 0 as default split point.") - Array(Double.NegativeInfinity, 0, Double.PositiveInfinity) - } else { - if (splitCandidates.head == Double.NegativeInfinity - && splitCandidates.last == Double.PositiveInfinity) { - splitCandidates - } else if (splitCandidates.head == Double.NegativeInfinity) { - splitCandidates ++ Array(Double.PositiveInfinity) - } else if (splitCandidates.last == Double.PositiveInfinity) { - Array(Double.NegativeInfinity) ++ splitCandidates - } else { - Array(Double.NegativeInfinity) ++ splitCandidates ++ Array(Double.PositiveInfinity) - } - } + val splitCandidates = QuantileDiscretizer.findSplitCandidates(samples, $(numBuckets) - 1) + val splits = QuantileDiscretizer.getSplits(splitCandidates) val bucketizer = new Bucketizer(uid).setSplits(splits) copyValues(bucketizer) } @@ -107,13 +96,14 @@ final class QuantileDiscretizer(override val uid: String) override def copy(extra: ParamMap): QuantileDiscretizer = defaultCopy(extra) } -private[feature] object QuantileDiscretizer { +private[feature] object QuantileDiscretizer extends Logging { /** * Sampling from the given dataset to collect quantile statistics. */ def getSampledInput(dataset: DataFrame, numBins: Int): Array[Row] = { val totalSamples = dataset.count() - assert(totalSamples > 0) + require(totalSamples > 0, + "QuantileDiscretizer requires non-empty input dataset but was given an empty input.") val requiredSamples = math.max(numBins * numBins, 10000) val fraction = math.min(requiredSamples / dataset.count(), 1.0) dataset.sample(withReplacement = false, fraction, new XORShiftRandom().nextInt()).collect() @@ -122,7 +112,7 @@ private[feature] object QuantileDiscretizer { /** * Compute split points with respect to the sample distribution. */ - def findSplits(samples: Array[Double], numSplits: Int): Array[Double] = { + def findSplitCandidates(samples: Array[Double], numSplits: Int): Array[Double] = { val valueCountMap = samples.foldLeft(Map.empty[Double, Int]) { (m, x) => m + ((x, m.getOrElse(x, 0) + 1)) } @@ -156,5 +146,33 @@ private[feature] object QuantileDiscretizer { splitsBuilder.result() } } + + /** + * Regulate split candidates to effective splits, such as adding positive/negative infinity in + * both sides, or using default split value in case of only ineffectiveness split candidates are + * found. + */ + def getSplits(splitCandidates: Array[Double]): Array[Double] = { + if (splitCandidates.size == 0) { + logInfo("Failed to find any effective splits, using 0 as default split point.") + Array(Double.NegativeInfinity, 0, Double.PositiveInfinity) + } else { + if (splitCandidates.head == Double.NegativeInfinity + && splitCandidates.last == Double.PositiveInfinity) { + if (splitCandidates.size == 2) { + logInfo("Failed to find any effective splits, using 0 as default split point.") + Array(splitCandidates.head, 0, splitCandidates.last) + } else { + splitCandidates + } + } else if (splitCandidates.head == Double.NegativeInfinity) { + splitCandidates ++ Array(Double.PositiveInfinity) + } else if (splitCandidates.last == Double.PositiveInfinity) { + Array(Double.NegativeInfinity) ++ splitCandidates + } else { + Array(Double.NegativeInfinity) ++ splitCandidates ++ Array(Double.PositiveInfinity) + } + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index e02427f39a966..90485f39c7a88 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -61,7 +61,7 @@ private object QuantileDiscretizerSuite extends SparkFunSuite { numBucket: Int, expectedResult: Array[Double], expectedAttrs: Array[String]): Unit = { - val sqlCtx = new SQLContext(sc) + val sqlCtx = SQLContext.getOrCreate(sc) import sqlCtx.implicits._ val df = sc.parallelize(data.map(Tuple1.apply)).toDF("input") From 0e807bc4bbe2a586df32d7033309c98468111a73 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 24 Sep 2015 13:54:51 -0700 Subject: [PATCH 11/12] add get splits test --- .../ml/feature/QuantileDiscretizer.scala | 40 +++++++++---------- .../ml/feature/QuantileDiscretizerSuite.scala | 16 ++++++++ 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index f0be45674173d..bf23f09023053 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -87,8 +87,8 @@ final class QuantileDiscretizer(override val uid: String) override def fit(dataset: DataFrame): Bucketizer = { val samples = QuantileDiscretizer.getSampledInput(dataset.select($(inputCol)), $(numBuckets)) .map { case Row(feature: Double) => feature } - val splitCandidates = QuantileDiscretizer.findSplitCandidates(samples, $(numBuckets) - 1) - val splits = QuantileDiscretizer.getSplits(splitCandidates) + val candidates = QuantileDiscretizer.findSplitCandidates(samples, $(numBuckets) - 1) + val splits = QuantileDiscretizer.getSplits(candidates) val bucketizer = new Bucketizer(uid).setSplits(splits) copyValues(bucketizer) } @@ -152,26 +152,26 @@ private[feature] object QuantileDiscretizer extends Logging { * both sides, or using default split value in case of only ineffectiveness split candidates are * found. */ - def getSplits(splitCandidates: Array[Double]): Array[Double] = { - if (splitCandidates.size == 0) { - logInfo("Failed to find any effective splits, using 0 as default split point.") - Array(Double.NegativeInfinity, 0, Double.PositiveInfinity) - } else { - if (splitCandidates.head == Double.NegativeInfinity - && splitCandidates.last == Double.PositiveInfinity) { - if (splitCandidates.size == 2) { - logInfo("Failed to find any effective splits, using 0 as default split point.") - Array(splitCandidates.head, 0, splitCandidates.last) - } else { - splitCandidates - } - } else if (splitCandidates.head == Double.NegativeInfinity) { - splitCandidates ++ Array(Double.PositiveInfinity) - } else if (splitCandidates.last == Double.PositiveInfinity) { - Array(Double.NegativeInfinity) ++ splitCandidates + def getSplits(candidates: Array[Double]): Array[Double] = { + val effectiveValues = if (candidates.size != 0) { + if (candidates.head == Double.NegativeInfinity + && candidates.last == Double.PositiveInfinity) { + candidates.drop(1).dropRight(1) + } else if (candidates.head == Double.NegativeInfinity) { + candidates.drop(1) + } else if (candidates.last == Double.PositiveInfinity) { + candidates.dropRight(1) } else { - Array(Double.NegativeInfinity) ++ splitCandidates ++ Array(Double.PositiveInfinity) + candidates } + } else { + candidates + } + + if (effectiveValues.size == 0) { + Array(Double.NegativeInfinity, 0, Double.PositiveInfinity) + } else { + Array(Double.NegativeInfinity) ++ effectiveValues ++ Array(Double.PositiveInfinity) } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index 90485f39c7a88..b2bdd8935f903 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -51,6 +51,22 @@ class QuantileDiscretizerSuite extends SparkFunSuite with MLlibTestSparkContext Array("-Infinity, 2.0", "2.0, Infinity")) } + + test("Test getting splits") { + val splitTestPoints = Array( + Array[Double]() -> Array(Double.NegativeInfinity, 0, Double.PositiveInfinity), + Array(Double.NegativeInfinity) -> Array(Double.NegativeInfinity, 0, Double.PositiveInfinity), + Array(Double.PositiveInfinity) -> Array(Double.NegativeInfinity, 0, Double.PositiveInfinity), + Array(Double.NegativeInfinity, Double.PositiveInfinity) + -> Array(Double.NegativeInfinity, 0, Double.PositiveInfinity), + Array(0.0) -> Array(Double.NegativeInfinity, 0, Double.PositiveInfinity), + Array(1.0) -> Array(Double.NegativeInfinity, 1, Double.PositiveInfinity), + Array(0.0, 1.0) -> Array(Double.NegativeInfinity, 0, 1, Double.PositiveInfinity) + ) + for ((ori, res) <- splitTestPoints) { + assert(QuantileDiscretizer.getSplits(ori) === res, "Returned splits are invalid.") + } + } } private object QuantileDiscretizerSuite extends SparkFunSuite { From 9f878e9ea31439858ff46f3a9d3a4889721e2f47 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 1 Oct 2015 14:25:08 -0700 Subject: [PATCH 12/12] minor changes --- .../org/apache/spark/ml/feature/QuantileDiscretizer.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index bf23f09023053..46b836da9cfde 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -17,10 +17,9 @@ package org.apache.spark.ml.feature -import org.apache.spark.Logging - import scala.collection.mutable +import org.apache.spark.Logging import org.apache.spark.annotation.Experimental import org.apache.spark.ml._ import org.apache.spark.ml.attribute.NominalAttribute @@ -148,9 +147,8 @@ private[feature] object QuantileDiscretizer extends Logging { } /** - * Regulate split candidates to effective splits, such as adding positive/negative infinity in - * both sides, or using default split value in case of only ineffectiveness split candidates are - * found. + * Adjust split candidates to proper splits by: adding positive/negative infinity to both sides as + * needed, and adding a default split value of 0 if no good candidates are found. */ def getSplits(candidates: Array[Double]): Array[Double] = { val effectiveValues = if (candidates.size != 0) {