-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-16283][SQL] Implement percentile_approx SQL function
#14298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #62670 has finished for PR 14298 at commit
|
1101580 to
f21d746
Compare
|
Test build #62676 has finished for PR 14298 at commit
|
|
Test build #62678 has finished for PR 14298 at commit
|
|
@hvanhovell could you take a look at this? Thanks! |
|
|
||
| /** | ||
| * Helper class to compute approximate quantile summary. | ||
| * This implementation is based on the algorithm proposed in the paper: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you need to move the implementation around?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!
module sql/core depends on module sql/catalyst, but module sql/catalyst does not depend on module sql/core; so in order to enable expressions like PercentileApprox (which lives in sql/catalyst) to use of the G-K algorithm, the algorithm should probably live in sql/catalyst. As a reference, algorithms like HyperLogLog also live in sql/catalyst(please refer to HyperLogLogPlusPlus).
@thunterdb does this make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this makes sense, thanks.
# Conflicts: # sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.sca la
|
@cloud-fan could you also help review this? Thanks! |
|
Test build #62706 has finished for PR 14298 at commit
|
|
Jenkins retest this please |
| */ | ||
| private def validatePercentilesLiteral(exp: Expression): (Seq[Double], Boolean) = { | ||
| def withinRange(v: Double): Boolean = 0.0 <= v && v <= 1.0 | ||
| exp match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also pattern match on the result of the exp.eval(). That would be way easier.
|
Test build #62881 has finished for PR 14298 at commit
|
| * large numbers of rows where the regular percentile() UDAF might run out of memory. | ||
| * | ||
| * The input is a single double value or an array of double values representing the percentiles | ||
| * requested. The output, corresponding to the input, is either an single double value or an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: an single -> a single
|
Test build #63105 has finished for PR 14298 at commit
|
|
@hvanhovell comments addressed. Please let me know when there's more to do! |
| // The number of intermediate outputs is highly relative to the actual data-set (an upper bound is | ||
| // (11/2e)log(2en), where e is the relativeError parameter, n is the number of items in the | ||
| // dataset) -- thus it's hard to allocate agg buffer in advance without knowing the size of | ||
| // inputs. Due to this reason, currently we don't support partial mode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain a bit more about this? AFAIK, hive supports partial aggregate for percentile_approx, and it looks to me that your implementation keeps the buffer data(QuantileSummaries) in this aggregate function object, instead of letting aggregate operator manage it, that's the main reason why we can't support partial aggregate for percentile_approx I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hive supports partial aggregate for percentile_approx
Hive's implementation computes approximate percentile values from a histogram, thus Hive supports partial aggregation (but makes no approximation guarantees).
... QuantileSummaries in this aggregate function object, instead of letting aggregate operator manage it, that's the main reason why we can't support partial aggregate
Yes that's quite right. QuantileSummaries has been implemented and well tested prior to this patch, so it'd be great if we can reuse that and put a QuantileSummaries instance directly into the aggregation buffer (in order to support partial aggregation). @cloud-fan any pointer on how to do that please?
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be clear, are you saying that our current algorithm to compute percentile_approx can't support partial aggregation fundamentally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fundamentally the QuantileSummaries implementation does support RDD-style partial aggregation -- QuantileSummaries itself is the agg buffer of a partition's data at mappers, and multiple QuantileSummariess will be merged at reducers (please refer to StatFunctions.multipleApproxQuantiles).
Fundamentally it should also support the SparkSQL-style partial aggregation. I'm trying to reuse QuantileSummaries here; if there's no easy way to reuse QuantileSummaries, I'm afraid we'll have to re-write this GK algorithm totally to support our SparkSQL-style partial aggregation.
So any way we can just directly put a QuantileSummaries instance into SparkSQL's agg buffer? Or do we have to break a QuantileSummaries instance up, say into an int + a double + a double[] + an int[] + an int[] which SparkSQL's agg buffer can manage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think putting buffer object in agg buffer row is better, but that need to be well designed. Can you hold this PR for a while? We are discussing about it internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, this can wait. Thanks for the information!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some updates, #14753
is created to support putting generic object in aggregation buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
@lw-lin |
|
@clockfly |
…which supports partial aggregation. ## What changes were proposed in this pull request? This PR implements aggregation function `percentile_approx`. Function `percentile_approx` returns the approximate percentile(s) of a column at the given percentage(s). A percentile is a watermark value below which a given percentage of the column values fall. For example, the percentile of column `col` at percentage 50% is the median value of column `col`. ### Syntax: ``` # Returns percentile at a given percentage value. The approximation error can be reduced by increasing parameter accuracy, at the cost of memory. percentile_approx(col, percentage [, accuracy]) # Returns percentile value array at given percentage value array percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy]) ``` ### Features: 1. This function supports partial aggregation. 2. The memory consumption is bounded. The larger `accuracy` parameter we choose, we smaller error we get. The default accuracy value is 10000, to match with Hive default setting. Choose a smaller value for smaller memory footprint. 3. This function supports window function aggregation. ### Example usages: ``` ## Returns the 25th percentile value, with default accuracy SELECT percentile_approx(col, 0.25) FROM table ## Returns an array of percentile value (25th, 50th, 75th), with default accuracy SELECT percentile_approx(col, array(0.25, 0.5, 0.75)) FROM table ## Returns 25th percentile value, with custom accuracy value 100, larger accuracy parameter yields smaller approximation error SELECT percentile_approx(col, 0.25, 100) FROM table ## Returns the 25th, and 50th percentile values, with custom accuracy value 100 SELECT percentile_approx(col, array(0.25, 0.5), 100) FROM table ``` ### NOTE: 1. The `percentile_approx` implementation is different from Hive, so the result returned on same query maybe slightly different with Hive. This implementation uses `QuantileSummaries` as the underlying probabilistic data structure, and mainly follows paper `Space-efficient Online Computation of Quantile Summaries` by Greenwald, Michael and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)` 2. The current implementation of `QuantileSummaries` doesn't support automatic compression. This PR has a rule to do compression automatically at the caller side, but it may not be optimal. ## How was this patch tested? Unit test, and Sql query test. ## Acknowledgement 1. This PR's work in based on lw-lin's PR apache#14298, with improvements like supporting partial aggregation, fixing out of memory issue. Author: Sean Zhong <[email protected]> Closes apache#14868 from clockfly/appro_percentile_try_2.
What changes were proposed in this pull request?
This patch Implements
percentile_approxSQL function using Spark's implementation of G-K algorithm.QuantileSummariesand related tests) fromsql/coretosql/catalystpercentile_approxusing G-K algorithmHow was this patch tested?