Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -58,49 +58,53 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
* @param probabilities a list of quantile probabilities
* Each number must belong to [0, 1].
* For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
* @param relativeError The relative target precision to achieve (greater or equal to 0).
* @param relativeError The relative target precision to achieve (greater than or equal to 0).
* If set to zero, the exact quantiles are computed, which could be very expensive.
* Note that values greater than 1 are accepted but give the same result as 1.
* @return the approximate quantiles at the given probabilities
*
* @note NaN values will be removed from the numerical column before calculation
* @note null and NaN values will be removed from the numerical column before calculation. If
* the dataframe is empty or all rows contain null or NaN, null is returned.
*
* @since 2.0.0
*/
def approxQuantile(
col: String,
probabilities: Array[Double],
relativeError: Double): Array[Double] = {
StatFunctions.multipleApproxQuantiles(df.select(col).na.drop(),
Seq(col), probabilities, relativeError).head.toArray
val res = approxQuantile(Array(col), probabilities, relativeError)
Option(res).map(_.head).orNull
}

/**
* Calculates the approximate quantiles of numerical columns of a DataFrame.
Copy link
Member

@HyukjinKwon HyukjinKwon Feb 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry. Actually, I initially meant remove DataFrameStatFunctions leaving the method because it is in the same class. Nevertheless, FWIW, I am fine with removing this @see as is given other functions here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The * was me getting fancy to get the ScalaDoc to link to the correct single-arg method (I did test it at the time and it does work for Scala though there may be a mistake here somewhere).

It would still be good to provide a @see reference even if it does not link nicely (so the simple backtick method name as you suggested?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will add it back. Should it be approxQuantile(col:Str* approxQuantile) or approxQuantile(String, Array[Double], Double) ?

* @see [[DataFrameStatsFunctions.approxQuantile(col:Str* approxQuantile]] for
* detailed description.
* @see `approxQuantile(col:Str* approxQuantile)` for detailed description.
*
* Note that rows containing any null or NaN values values will be removed before
* calculation.
* @param cols the names of the numerical columns
* @param probabilities a list of quantile probabilities
* Each number must belong to [0, 1].
* For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
* @param relativeError The relative target precision to achieve (>= 0).
* @param relativeError The relative target precision to achieve (greater than or equal to 0).
* If set to zero, the exact quantiles are computed, which could be very expensive.
* Note that values greater than 1 are accepted but give the same result as 1.
* @return the approximate quantiles at the given probabilities of each column
*
* @note Rows containing any NaN values will be removed before calculation
* @note Rows containing any null or NaN values will be removed before calculation. If
* the dataframe is empty or all rows contain null or NaN, null is returned.
*
* @since 2.2.0
*/
def approxQuantile(
cols: Array[String],
probabilities: Array[Double],
relativeError: Double): Array[Array[Double]] = {
StatFunctions.multipleApproxQuantiles(df.select(cols.map(col): _*).na.drop(), cols,
probabilities, relativeError).map(_.toArray).toArray
// TODO: Update NaN/null handling to keep consistent with the single-column version
try {
StatFunctions.multipleApproxQuantiles(df.select(cols.map(col): _*).na.drop(), cols,
probabilities, relativeError).map(_.toArray).toArray
} catch {
case e: NoSuchElementException => null
}
}


Expand All @@ -112,7 +116,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
probabilities: List[Double],
relativeError: Double): java.util.List[java.util.List[Double]] = {
approxQuantile(cols.toArray, probabilities.toArray, relativeError)
.map(_.toList.asJava).toList.asJava
.map(_.toList.asJava).toList.asJava
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object StatFunctions extends Logging {
* @param probabilities a list of quantile probabilities
* Each number must belong to [0, 1].
* For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
* @param relativeError The relative target precision to achieve (>= 0).
* @param relativeError The relative target precision to achieve (greater than or equal 0).
* If set to zero, the exact quantiles are computed, which could be very expensive.
* Note that values greater than 1 are accepted but give the same result as 1.
*
Expand All @@ -60,6 +60,8 @@ object StatFunctions extends Logging {
cols: Seq[String],
probabilities: Seq[Double],
relativeError: Double): Seq[Seq[Double]] = {
require(relativeError >= 0,
s"Relative Error must be non-negative but got $relativeError")
val columns: Seq[Column] = cols.map { colName =>
val field = df.schema(colName)
require(field.dataType.isInstanceOf[NumericType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}

class DataFrameStatSuite extends QueryTest with SharedSQLContext {
import testImplicits._
Expand Down Expand Up @@ -159,16 +159,75 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
assert(math.abs(md1 - 2 * q1 * n) < error_double)
assert(math.abs(md2 - 2 * q2 * n) < error_double)
}
// test approxQuantile on NaN values
val dfNaN = Seq(Double.NaN, 1.0, Double.NaN, Double.NaN).toDF("input")
val resNaN = dfNaN.stat.approxQuantile("input", Array(q1, q2), epsilons.head)

// quantile should be in the range [0.0, 1.0]
val e = intercept[IllegalArgumentException] {
df.stat.approxQuantile(Array("singles", "doubles"), Array(q1, q2, -0.1), epsilons.head)
}
assert(e.getMessage.contains("quantile should be in the range [0.0, 1.0]"))

// relativeError should be non-negative
val e2 = intercept[IllegalArgumentException] {
df.stat.approxQuantile(Array("singles", "doubles"), Array(q1, q2), -1.0)
}
assert(e2.getMessage.contains("Relative Error must be non-negative"))

// return null if the dataset is empty
val res1 = df.selectExpr("*").limit(0)
.stat.approxQuantile("singles", Array(q1, q2), epsilons.head)
assert(res1 === null)

val res2 = df.selectExpr("*").limit(0)
.stat.approxQuantile(Array("singles", "doubles"), Array(q1, q2), epsilons.head)
assert(res2 === null)
}

test("approximate quantile 2: test relativeError greater than 1 return the same result as 1") {
val n = 1000
val df = Seq.tabulate(n)(i => (i, 2.0 * i)).toDF("singles", "doubles")

val q1 = 0.5
val q2 = 0.8
val epsilons = List(2.0, 5.0, 100.0)

val Array(single1_1) = df.stat.approxQuantile("singles", Array(q1), 1.0)
val Array(s1_1, s2_1) = df.stat.approxQuantile("singles", Array(q1, q2), 1.0)
val Array(Array(ms1_1, ms2_1), Array(md1_1, md2_1)) =
df.stat.approxQuantile(Array("singles", "doubles"), Array(q1, q2), 1.0)

for (epsilon <- epsilons) {
val Array(single1) = df.stat.approxQuantile("singles", Array(q1), epsilon)
val Array(s1, s2) = df.stat.approxQuantile("singles", Array(q1, q2), epsilon)
val Array(Array(ms1, ms2), Array(md1, md2)) =
df.stat.approxQuantile(Array("singles", "doubles"), Array(q1, q2), epsilon)
assert(single1_1 === single1)
assert(s1_1 === s1)
assert(s2_1 === s2)
assert(ms1_1 === ms1)
assert(ms2_1 === ms2)
assert(md1_1 === md1)
assert(md2_1 === md2)
}
}

test("approximate quantile 3: test on NaN and null values") {
val q1 = 0.5
val q2 = 0.8
val epsilon = 0.1
val rows = spark.sparkContext.parallelize(Seq(Row(Double.NaN, 1.0), Row(1.0, 1.0),
Row(-1.0, Double.NaN), Row(Double.NaN, Double.NaN), Row(null, null), Row(null, 1.0),
Row(-1.0, null), Row(Double.NaN, null)))
val schema = StructType(Seq(StructField("input1", DoubleType, nullable = true),
StructField("input2", DoubleType, nullable = true)))
val dfNaN = spark.createDataFrame(rows, schema)
val resNaN = dfNaN.stat.approxQuantile("input1", Array(q1, q2), epsilon)
assert(resNaN.count(_.isNaN) === 0)
// test approxQuantile on multi-column NaN values
val dfNaN2 = Seq((Double.NaN, 1.0), (1.0, 1.0), (-1.0, Double.NaN), (Double.NaN, Double.NaN))
.toDF("input1", "input2")
val resNaN2 = dfNaN2.stat.approxQuantile(Array("input1", "input2"),
Array(q1, q2), epsilons.head)
assert(resNaN.count(_ == null) === 0)

val resNaN2 = dfNaN.stat.approxQuantile(Array("input1", "input2"),
Array(q1, q2), epsilon)
assert(resNaN2.flatten.count(_.isNaN) === 0)
assert(resNaN2.flatten.count(_ == null) === 0)
}

test("crosstab") {
Expand Down