From a8f1b331881a907ce4b82ada3dc077471a8f6271 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 6 Apr 2016 14:22:18 +0000 Subject: [PATCH 1/7] Add API to compute approxQuantile for multiple columns. --- .../spark/sql/DataFrameStatFunctions.scala | 16 ++++++++++++++++ .../apache/spark/sql/DataFrameStatSuite.scala | 13 +++++++++++++ 2 files changed, 29 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 3eb1f0f0d58ff..a8885ca001207 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -70,6 +70,14 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { StatFunctions.multipleApproxQuantiles(df, Seq(col), probabilities, relativeError).head.toArray } + def approxQuantile( + cols: Array[String], + probabilities: Array[Double], + relativeError: Double): Array[Array[Double]] = { + StatFunctions.multipleApproxQuantiles(df, cols, probabilities, relativeError) + .map(_.toArray).toArray + } + /** * Python-friendly version of [[approxQuantile()]] */ @@ -80,6 +88,14 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { approxQuantile(col, probabilities.toArray, relativeError).toList.asJava } + private[spark] def approxQuantile( + cols: List[String], + probabilities: List[Double], + relativeError: Double): java.util.List[java.util.List[Double]] = { + approxQuantile(cols.toArray, probabilities.toArray, relativeError) + .map(_.toList.asJava).toList.asJava + } + /** * Calculate the sample covariance of two numerical columns of a DataFrame. * @param col1 the name of the first column diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 0ea7727e45029..431eb5a0be5a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -150,6 +150,19 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { assert(math.abs(d1 - 2 * q1 * n) < error_double) assert(math.abs(d2 - 2 * q2 * n) < error_double) } + + for (epsilon <- epsilons) { + val Array(Array(s1, s2), Array(d1, d2)) = df.stat.approxQuantile(Array("singles", "doubles"), + Array(q1, q2), epsilon) + + val error_single = 2 * 1000 * epsilon + val error_double = 2 * 2000 * epsilon + + assert(math.abs(s1 - q1 * n) < error_single) + assert(math.abs(s2 - q2 * n) < error_single) + assert(math.abs(d1 - 2 * q1 * n) < error_double) + assert(math.abs(d2 - 2 * q2 * n) < error_double) + } } test("crosstab") { From 47d52b9f12cf30978b966c970261baffb48cd45c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 6 Apr 2016 14:27:11 +0000 Subject: [PATCH 2/7] Add comment. --- .../spark/sql/DataFrameStatFunctions.scala | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index a8885ca001207..85f9a6ca05c4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -52,14 +52,14 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient * Online Computation of Quantile Summaries]] by Greenwald and Khanna. * - * @param col the name of the numerical column + * @param col the name of the numerical column. * @param probabilities a list of quantile probabilities * Each number must belong to [0, 1]. * For example 0 is the minimum, 0.5 is the median, 1 is the maximum. * @param relativeError The relative target precision to achieve (>= 0). * If set to zero, the exact quantiles are computed, which could be very expensive. * Note that values greater than 1 are accepted but give the same result as 1. - * @return the approximate quantiles at the given probabilities + * @return the approximate quantiles at the given probabilities. * * @since 2.0.0 */ @@ -70,6 +70,20 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { StatFunctions.multipleApproxQuantiles(df, Seq(col), probabilities, relativeError).head.toArray } + /** + * Calculates the approximate quantiles of numerical columns of a DataFrame. + * + * @param cols the names of the numerical columns. + * @param probabilities a list of quantile probabilities + * Each number must belong to [0, 1]. + * For example 0 is the minimum, 0.5 is the median, 1 is the maximum. + * @param relativeError The relative target precision to achieve (>= 0). + * If set to zero, the exact quantiles are computed, which could be very expensive. + * Note that values greater than 1 are accepted but give the same result as 1. + * @return the approximate quantiles at the given probabilities for given columns. + * + * @since 2.0.0 + */ def approxQuantile( cols: Array[String], probabilities: Array[Double], @@ -88,6 +102,10 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { approxQuantile(col, probabilities.toArray, relativeError).toList.asJava } + /** + * Python-friendly version of [[approxQuantile()]] that computes approximate quantiles + * for multiple columns. + */ private[spark] def approxQuantile( cols: List[String], probabilities: List[Double], From 75edcb1a657e2222d31de674f03af374f1966f8c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 7 Apr 2016 10:35:15 +0000 Subject: [PATCH 3/7] Address comments and change Python API too. --- python/pyspark/sql/dataframe.py | 25 +++++++++++++------ python/pyspark/sql/tests.py | 8 ++++++ .../spark/sql/DataFrameStatFunctions.scala | 1 + .../apache/spark/sql/DataFrameStatSuite.scala | 18 ++++++------- 4 files changed, 33 insertions(+), 19 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index d473d6b534647..03db6e8c3bbe0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1162,7 +1162,7 @@ def replace(self, to_replace, value, subset=None): self._jdf.na().replace(self._jseq(subset), self._jmap(rep_dict)), self.sql_ctx) @since(2.0) - def approxQuantile(self, col, probabilities, relativeError): + def approxQuantile(self, cols, probabilities, relativeError): """ Calculates the approximate quantiles of a numerical column of a DataFrame. @@ -1181,7 +1181,7 @@ def approxQuantile(self, col, probabilities, relativeError): Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna. - :param col: the name of the numerical column + :param cols: the name(s) of the numerical column(s) :param probabilities: a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum. @@ -1191,8 +1191,13 @@ def approxQuantile(self, col, probabilities, relativeError): accepted but give the same result as 1. :return: the approximate quantiles at the given probabilities """ - if not isinstance(col, str): - raise ValueError("col should be a string.") + if not isinstance(cols, (str, list, tuple)): + raise ValueError("col should be a string, list or tuple.") + + if isinstance(cols, tuple): + cols = list(cols) + if isinstance(cols, list): + cols = _to_list(self._sc, cols) if not isinstance(probabilities, (list, tuple)): raise ValueError("probabilities should be a list or tuple") @@ -1207,8 +1212,12 @@ def approxQuantile(self, col, probabilities, relativeError): raise ValueError("relativeError should be numerical (float, int, long) >= 0.") relativeError = float(relativeError) - jaq = self._jdf.stat().approxQuantile(col, probabilities, relativeError) - return list(jaq) + jaq = self._jdf.stat().approxQuantile(cols, probabilities, relativeError) + jaq = list(jaq) + for idx, a in enumerate(jaq): + if not isinstance(a, (list, float)): + jaq[idx] = list(a) + return jaq @since(1.4) def corr(self, col1, col2, method=None): @@ -1440,8 +1449,8 @@ class DataFrameStatFunctions(object): def __init__(self, df): self.df = df - def approxQuantile(self, col, probabilities, relativeError): - return self.df.approxQuantile(col, probabilities, relativeError) + def approxQuantile(self, cols, probabilities, relativeError): + return self.df.approxQuantile(cols, probabilities, relativeError) approxQuantile.__doc__ = DataFrame.approxQuantile.__doc__ diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index e4f79c911c0d9..83d6cb07dbb9a 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -702,6 +702,14 @@ def test_approxQuantile(self): self.assertEqual(len(aq), 3) self.assertTrue(all(isinstance(q, float) for q in aq)) + aqs = df.stat.approxQuantile(["a", "a"], [0.1, 0.5, 0.9], 0.1) + self.assertTrue(isinstance(aqs[0], list)) + self.assertEqual(len(aqs[0]), 3) + self.assertTrue(all(isinstance(q, float) for q in aqs[0])) + self.assertTrue(isinstance(aqs[1], list)) + self.assertEqual(len(aqs[1]), 3) + self.assertTrue(all(isinstance(q, float) for q in aqs[1])) + def test_corr(self): import math df = self.sc.parallelize([Row(a=i, b=math.sqrt(i)) for i in range(10)]).toDF() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 85f9a6ca05c4d..512b1bf1bc91b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -72,6 +72,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { /** * Calculates the approximate quantiles of numerical columns of a DataFrame. + * @see approxQuantile for detailed description. * * @param cols the names of the numerical columns. * @param probabilities a list of quantile probabilities diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 431eb5a0be5a4..657de10c7b182 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -149,19 +149,15 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { assert(math.abs(s2 - q2 * n) < error_single) assert(math.abs(d1 - 2 * q1 * n) < error_double) assert(math.abs(d2 - 2 * q2 * n) < error_double) - } - - for (epsilon <- epsilons) { - val Array(Array(s1, s2), Array(d1, d2)) = df.stat.approxQuantile(Array("singles", "doubles"), - Array(q1, q2), epsilon) - val error_single = 2 * 1000 * epsilon - val error_double = 2 * 2000 * epsilon + // Multiple columns + val Array(Array(ms1, ms2), Array(md1, md2)) = + df.stat.approxQuantile(Array("singles", "doubles"), Array(q1, q2), epsilon) - assert(math.abs(s1 - q1 * n) < error_single) - assert(math.abs(s2 - q2 * n) < error_single) - assert(math.abs(d1 - 2 * q1 * n) < error_double) - assert(math.abs(d2 - 2 * q2 * n) < error_double) + assert(math.abs(ms1 - q1 * n) < error_single) + assert(math.abs(ms2 - q2 * n) < error_single) + assert(math.abs(md1 - 2 * q1 * n) < error_double) + assert(math.abs(md2 - 2 * q2 * n) < error_double) } } From 619660d72733f38dc947c576b794375018e2ecb0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 8 Apr 2016 14:30:45 +0000 Subject: [PATCH 4/7] Address comments. --- python/pyspark/sql/dataframe.py | 19 +++++++++++-------- python/pyspark/sql/tests.py | 1 + 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 03db6e8c3bbe0..c1efa88ab1eca 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1181,15 +1181,18 @@ def approxQuantile(self, cols, probabilities, relativeError): Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna. - :param cols: the name(s) of the numerical column(s) + :param cols: str, list. + The name(s) of the numerical column(s). Can be a string of the name + of a single column or the list of the names of multiple columns. :param probabilities: a list of quantile probabilities - Each number must belong to [0, 1]. - For example 0 is the minimum, 0.5 is the median, 1 is the maximum. - :param relativeError: The relative target precision to achieve - (>= 0). If set to zero, the exact quantiles are computed, which - could be very expensive. Note that values greater than 1 are - accepted but give the same result as 1. - :return: the approximate quantiles at the given probabilities + Each number must belong to [0, 1]. + For example 0 is the minimum, 0.5 is the median, 1 is the maximum. + :param relativeError: The relative target precision to achieve + (>= 0). If set to zero, the exact quantiles are computed, which + could be very expensive. Note that values greater than 1 are + accepted but give the same result as 1. + :return: the approximate quantiles at the given probabilities for + the given column or columns. """ if not isinstance(cols, (str, list, tuple)): raise ValueError("col should be a string, list or tuple.") diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 83d6cb07dbb9a..5f8fa42deb349 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -703,6 +703,7 @@ def test_approxQuantile(self): self.assertTrue(all(isinstance(q, float) for q in aq)) aqs = df.stat.approxQuantile(["a", "a"], [0.1, 0.5, 0.9], 0.1) + self.assertEqual(len(aqs), 2) self.assertTrue(isinstance(aqs[0], list)) self.assertEqual(len(aqs[0]), 3) self.assertTrue(all(isinstance(q, float) for q in aqs[0])) From b64bd4eeedf5926eb644ae64f954eea2356de906 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 8 Apr 2016 14:35:39 +0000 Subject: [PATCH 5/7] Update comment. --- .../scala/org/apache/spark/sql/DataFrameStatFunctions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 512b1bf1bc91b..0c2ecb2bb2ddf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -72,7 +72,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { /** * Calculates the approximate quantiles of numerical columns of a DataFrame. - * @see approxQuantile for detailed description. + * @see #approxQuantile(String, Array[Double], Double) for detailed description. * * @param cols the names of the numerical columns. * @param probabilities a list of quantile probabilities From 89d4d3e978379858e6557343d21d3ae4aa263cfb Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 11 Apr 2016 07:03:20 +0000 Subject: [PATCH 6/7] Slightly modify comment. --- python/pyspark/sql/dataframe.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index c1efa88ab1eca..314c2f45f42b6 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1182,8 +1182,7 @@ def approxQuantile(self, cols, probabilities, relativeError): by Greenwald and Khanna. :param cols: str, list. - The name(s) of the numerical column(s). Can be a string of the name - of a single column or the list of the names of multiple columns. + Can be a single column name, or a list of names for multiple columns. :param probabilities: a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum. From 4309001b0ff4d5b0571f05f4ea58f36855271b99 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 12 Apr 2016 01:44:42 +0000 Subject: [PATCH 7/7] Check the content of list. --- python/pyspark/sql/dataframe.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 314c2f45f42b6..db8c6cd41a8f2 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1199,6 +1199,9 @@ def approxQuantile(self, cols, probabilities, relativeError): if isinstance(cols, tuple): cols = list(cols) if isinstance(cols, list): + for c in cols: + if not isinstance(c, str): + raise ValueError("column name should be string.") cols = _to_list(self._sc, cols) if not isinstance(probabilities, (list, tuple)):