Skip to content

Conversation

@clockfly
Copy link
Contributor

@clockfly clockfly commented Aug 29, 2016

What changes were proposed in this pull request?

This PR implements aggregation function percentile_approx. Function percentile_approx returns the approximate percentile(s) of a column at the given percentage(s). A percentile is a watermark value below which a given percentage of the column values fall. For example, the percentile of column col at percentage 50% is the median value of column col.

Syntax:

# Returns percentile at a given percentage value. The approximation error can be reduced by increasing parameter accuracy, at the cost of memory.
percentile_approx(col, percentage [, accuracy])

# Returns percentile value array at given percentage value array 
percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy])

Features:

  1. This function supports partial aggregation.
  2. The memory consumption is bounded. The larger accuracy parameter we choose, we smaller error we get. The default accuracy value is 10000, to match with Hive default setting. Choose a smaller value for smaller memory footprint.
  3. This function supports window function aggregation.

Example usages:

## Returns the 25th percentile value, with default accuracy
SELECT percentile_approx(col, 0.25) FROM table

## Returns an array of percentile value (25th, 50th, 75th), with default accuracy
SELECT percentile_approx(col, array(0.25, 0.5, 0.75)) FROM table

## Returns 25th percentile value, with custom accuracy value 100, larger accuracy parameter yields smaller approximation error 
SELECT percentile_approx(col, 0.25, 100) FROM table

## Returns the 25th, and 50th percentile values, with custom accuracy value 100
SELECT percentile_approx(col, array(0.25, 0.5), 100) FROM table

NOTE:

  1. The percentile_approx implementation is different from Hive, so the result returned on same query maybe slightly different with Hive. This implementation uses QuantileSummaries as the underlying probabilistic data structure, and mainly follows paper Space-efficient Online Computation of Quantile Summaries by Greenwald, Michael and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)`
  2. The current implementation of QuantileSummaries doesn't support automatic compression. This PR has a rule to do compression automatically at the caller side, but it may not be optimal.

How was this patch tested?

Unit test, and Sql query test.

Acknowledgement

  1. This PR's work in based on @lw-lin's PR [SPARK-16283][SQL] Implement percentile_approx SQL function #14298, with improvements like supporting partial aggregation, fixing out of memory issue.

@clockfly clockfly force-pushed the appro_percentile_try_2 branch from d71397c to 88232ed Compare August 29, 2016 21:26
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it as a case class, so that we can use the expression encoder to serialize it.

@clockfly clockfly force-pushed the appro_percentile_try_2 branch from 88232ed to 2bfe79e Compare August 29, 2016 21:42
@clockfly
Copy link
Contributor Author

The test failure is caused by https://issues.apache.org/jira/browse/SPARK-17289


// An case class to wrap fields of QuantileSummaries, so that we can use the expression encoder
// to serialize it.
case class QuantileSummariesData(
Copy link
Contributor Author

@clockfly clockfly Aug 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QuantileSummaries contains some private var fields, so I think it is better to define a wrapper class instead of changing QuantileSummaries to case class (case class is not supposed to contain var fields).

@clockfly
Copy link
Contributor Author

@thunterdb

  1. Looks like current impl of QuantileSummaries will not do automatically compression in some cases.
    For example:
    (1 to 10000000).foreach(qualileSummaries.insert(_)) probably will trigger OOM. Is this expected?

  2. The following code will add two sample Stats in QuantileSummaries, is this expected? This results in some strange behavior when used in window function.

    val quantileSummaries = new QuantileSummaries(1000, 0.001)
    quantileSummaries.insert(100.0D)
    val updated = quantileSummaries.compress()
    
    // Is this supposed to be 2?
    assert(updated.sampled.length == 2)
    

@clockfly
Copy link
Contributor Author

@yhuai @cloud-fan, can you take a look?

}
}

test("percentile_approx(), aggregation on empty input table, with group by") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SortBasedAggregationIterator has different code path for "with group by" and "no group by", so it is better that we test both cases.

@SparkQA
Copy link

SparkQA commented Aug 29, 2016

Test build #64600 has finished for PR 14868 at commit 88232ed.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ApproximatePercentile(
    • class PercentileDigest(
    • class PercentileDigestSerializer
    • case class QuantileSummaries(

@SparkQA
Copy link

SparkQA commented Aug 29, 2016

Test build #64602 has finished for PR 14868 at commit 6be2ebe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ApproximatePercentile(
    • class PercentileDigest(
    • class PercentileDigestSerializer
    • case class QuantileSummariesData(

@clockfly clockfly force-pushed the appro_percentile_try_2 branch from dc09d8c to 9d7fb85 Compare August 30, 2016 03:19
@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64621 has finished for PR 14868 at commit dc09d8c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64623 has finished for PR 14868 at commit 9d7fb85.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@clockfly
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64639 has finished for PR 14868 at commit 9d7fb85.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@clockfly clockfly force-pushed the appro_percentile_try_2 branch from 9d7fb85 to 2ab4a11 Compare August 30, 2016 08:57
@clockfly clockfly force-pushed the appro_percentile_try_2 branch from 2ab4a11 to 3dda1ae Compare August 30, 2016 09:10
*
* @param child child expression that can produce column value with `child.eval(inputRow)`
* @param percentageExpression Expression that represents a single percentage value or
* a array of percentage values. Each percentage value must be between
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "a array" => "an array"

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64642 has finished for PR 14868 at commit 2ab4a11.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64643 has finished for PR 14868 at commit 3dda1ae.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64646 has finished for PR 14868 at commit 96bd5a4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

""")
case class ApproximatePercentile(
child: Expression,
percentageExpression: Expression,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sql string is automatically generated, without hacking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we can use the InputType Check facility to get a more consistent error message if check fails.

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64653 has finished for PR 14868 at commit 539d693.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

test("serialize and de-serialize") {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this new line.

@liancheng
Copy link
Contributor

liancheng commented Aug 30, 2016

LGTM except for a few minor issues. The failed test should be a typo in this line.

@clockfly clockfly changed the title Implements percentile_approx aggregation function which supports partial aggregation. [SPARK-16283][SQL][WIP] Implements percentile_approx aggregation function which supports partial aggregation. Aug 31, 2016
@SparkQA
Copy link

SparkQA commented Aug 31, 2016

Test build #64676 has finished for PR 14868 at commit 1965b1a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 31, 2016

Test build #64687 has finished for PR 14868 at commit 421e211.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@clockfly clockfly force-pushed the appro_percentile_try_2 branch 2 times, most recently from 3563ce9 to bc70a00 Compare August 31, 2016 03:14
@clockfly clockfly force-pushed the appro_percentile_try_2 branch from bc70a00 to 3f08c02 Compare August 31, 2016 03:17
summaries.sampled.length * (Doubles.BYTES + Ints.BYTES + Ints.BYTES)
}

final def serialize(obj: PercentileDigest): Array[Byte] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I swap the serialization implementation from ExpressionEncoder because it is not safe to initialize ExpressionEncoder at the Executor side.
(scala reflection doesn't works well with ExecutorClassLoader)

stacktrace:

scala> spark.sql("select percentile_approx(a, 0.5) from t").show
16/08/31 09:17:58 WARN TaskSetManager: Stage 0 contains a task of very large size (327 KB). The maximum recommended task size is 100 KB.
16/08/31 09:17:59 ERROR ExecutorClassLoader: Failed to check existence of class <root>.package on REPL class server at spark://127.0.0.1:58512/classes
java.net.URISyntaxException: Illegal character in path at index 32: spark://127.0.0.1:58512/classes/<root>/package.class
    at java.net.URI$Parser.fail(URI.java:2848)
    at java.net.URI$Parser.checkChars(URI.java:3021)
    at java.net.URI$Parser.parseHierarchical(URI.java:3105)
    at java.net.URI$Parser.parse(URI.java:3053)
    at java.net.URI.<init>(URI.java:588)
    at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:316)
    at org.apache.spark.repl.ExecutorClassLoader.org$apache$spark$repl$ExecutorClassLoader$$getClassFileInputStreamFromSparkRPC(ExecutorClassLoader.scala:90)
    at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
    at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
    at org.apache.spark.repl.ExecutorClassLoader.findClassLocally(ExecutorClassLoader.scala:162)
    at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:80)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.tryJavaClass(JavaMirrors.scala:559)
    at scala.reflect.runtime.SymbolLoaders$PackageScope$$anonfun$lookupEntry$1.apply(SymbolLoaders.scala:137)
    at scala.reflect.runtime.SymbolLoaders$PackageScope$$anonfun$lookupEntry$1.apply(SymbolLoaders.scala:126)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.SymbolLoaders$PackageScope.syncLockSynchronized(SymbolLoaders.scala:124)
    at scala.reflect.runtime.SymbolLoaders$PackageScope.lookupEntry(SymbolLoaders.scala:126)
    at scala.reflect.internal.Types$Type.findDecl(Types.scala:971)
    at scala.reflect.internal.Types$Type.decl(Types.scala:566)
    at scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:335)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply$mcV$sp(SymbolLoaders.scala:74)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
    at scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:263)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType.complete(SymbolLoaders.scala:71)
    at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anon$1.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(JavaMirrors.scala:66)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anon$1.gilSynchronizedIfNotThreadsafe(JavaMirrors.scala:66)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
    at scala.reflect.runtime.JavaMirrors$JavaMirror$$anon$1.info(JavaMirrors.scala:66)
    at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:256)
    at scala.reflect.runtime.JavaMirrors$class.scala$reflect$runtime$JavaMirrors$$createMirror(JavaMirrors.scala:32)
    at scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:49)
    at scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:47)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:46)
    at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
    at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
    at org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:753)
    at org.apache.spark.sql.catalyst.ScalaReflection$.localTypeOf(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$.definedByConstructorParams(ScalaReflection.scala:712)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:51)
    at org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigestSerializer.<init>(ApproximatePercentile.scala:273)
    at org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.serializer$lzycompute(ApproximatePercentile.scala:82)
    at org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.serializer(ApproximatePercentile.scala:82)
    at org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.serialize(ApproximatePercentile.scala:167)
    at org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.serialize(ApproximatePercentile.scala:64)
    at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:530)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$2.apply(AggregationIterator.scala:250)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$2.apply(AggregationIterator.scala:246)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:150)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/08/31 09:17:59 ERROR ExecutorClassLoader: Failed to check existence of class <root>.scala on REPL class server at spark://127.0.0.1:58512/classes
java.net.URISyntaxException: Illegal character in path at index 32: spark://127.0.0.1:58512/classes/<root>/scala.class
    at java.net.URI$Parser.fail(URI.java:2848)
    at java.net.URI$Parser.checkChars(URI.java:3021)
    at java.net.URI$Parser.parseHierarchical(URI.java:3105)
    at java.net.URI$Parser.parse(URI.java:3053)
    at java.net.URI.<init>(URI.java:588)
    at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:316)
    at org.apache.spark.repl.ExecutorClassLoader.org$apache$spark$repl$ExecutorClassLoader$$getClassFileInputStreamFromSparkRPC(ExecutorClassLoader.scala:90)
    at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
    at org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57)
    at org.apache.spark.repl.ExecutorClassLoader.findClassLocally(ExecutorClassLoader.scala:162)
    at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:80)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:555)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.tryJavaClass(JavaMirrors.scala:559)
    at scala.reflect.runtime.SymbolLoaders$PackageScope$$anonfun$lookupEntry$1.apply(SymbolLoaders.scala:137)
    at scala.reflect.runtime.SymbolLoaders$PackageScope$$anonfun$lookupEntry$1.apply(SymbolLoaders.scala:126)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.SymbolLoaders$PackageScope.syncLockSynchronized(SymbolLoaders.scala:124)
    at scala.reflect.runtime.SymbolLoaders$PackageScope.lookupEntry(SymbolLoaders.scala:126)
    at scala.reflect.internal.tpe.FindMembers$FindMemberBase.walkBaseClasses(FindMembers.scala:88)
    at scala.reflect.internal.tpe.FindMembers$FindMemberBase.searchConcreteThenDeferred(FindMembers.scala:56)
    at scala.reflect.internal.tpe.FindMembers$FindMemberBase.apply(FindMembers.scala:48)
    at scala.reflect.internal.Types$Type.scala$reflect$internal$Types$Type$$findMemberInternal$1(Types.scala:1014)
    at scala.reflect.internal.Types$Type.findMember(Types.scala:1016)
    at scala.reflect.internal.Types$Type.memberBasedOnName(Types.scala:631)
    at scala.reflect.internal.Types$Type.member(Types.scala:600)
    at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
    at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
    at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:66)
    at scala.reflect.internal.Mirrors$RootsBase.staticPackage(Mirrors.scala:204)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.staticPackage(JavaMirrors.scala:82)
    at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:263)
    at scala.reflect.runtime.JavaMirrors$class.scala$reflect$runtime$JavaMirrors$$createMirror(JavaMirrors.scala:32)
    at scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:49)
    at scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:47)
    at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
    at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
    at scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:46)
    at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
    at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
    at org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:753)
    at org.apache.spark.sql.catalyst.ScalaReflection$.localTypeOf(ScalaReflection.scala:39)
    at org.apache.spark.sql.catalyst.ScalaReflection$.definedByConstructorParams(ScalaReflection.scala:712)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:51)
    at org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigestSerializer.<init>(ApproximatePercentile.scala:273)
    at org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.serializer$lzycompute(ApproximatePercentile.scala:82)
    at org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.serializer(ApproximatePercentile.scala:82)
    at org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.serialize(ApproximatePercentile.scala:167)
    at org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.serialize(ApproximatePercentile.scala:64)
    at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:530)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$2.apply(AggregationIterator.scala:250)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$2.apply(AggregationIterator.scala:246)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:150)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

@SparkQA
Copy link

SparkQA commented Aug 31, 2016

Test build #64695 has finished for PR 14868 at commit bc70a00.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 31, 2016

Test build #64696 has finished for PR 14868 at commit 3f08c02.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@clockfly clockfly changed the title [SPARK-16283][SQL][WIP] Implements percentile_approx aggregation function which supports partial aggregation. [SPARK-16283][SQL] Implements percentile_approx aggregation function which supports partial aggregation. Aug 31, 2016
/** In-place merges in another PercentileDigest. */
def merge(other: PercentileDigest): Unit = {
if (!isCompressed) compress()
summaries = summaries.merge(other.quantileSummaries)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the other need be compressed too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. other.quantileSummaries will get a compression version of QuantileSummaries.

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

@asfgit asfgit closed this in a18c169 Sep 1, 2016
clockfly added a commit to clockfly/spark that referenced this pull request Sep 22, 2016
…which supports partial aggregation.

This is cherry-pick of feature on open source master branch (hash: databricks/runtime@f003e0c).

## What changes were proposed in this pull request?

This PR implements aggregation function `percentile_approx`. Function `percentile_approx` returns the approximate percentile(s) of a column at the given percentage(s). A percentile is a watermark value below which a given percentage of the column values fall. For example, the percentile of column `col` at percentage 50% is the median value of column `col`.

### Syntax:
```
# Returns percentile at a given percentage value. The approximation error can be reduced by increasing parameter accuracy, at the cost of memory.
percentile_approx(col, percentage [, accuracy])

# Returns percentile value array at given percentage value array
percentile_approx(col, array(percentage1 [, percentage2]...) [, accuracy])
```

### Features:
1. This function supports partial aggregation.
2. The memory consumption is bounded. The larger `accuracy` parameter we choose, we smaller error we get. The default accuracy value is 10000, to match with Hive default setting. Choose a smaller value for smaller memory footprint.
3.  This function supports window function aggregation.

### Example usages:
```
## Returns the 25th percentile value, with default accuracy
SELECT percentile_approx(col, 0.25) FROM table

## Returns an array of percentile value (25th, 50th, 75th), with default accuracy
SELECT percentile_approx(col, array(0.25, 0.5, 0.75)) FROM table

## Returns 25th percentile value, with custom accuracy value 100, larger accuracy parameter yields smaller approximation error
SELECT percentile_approx(col, 0.25, 100) FROM table

## Returns the 25th, and 50th percentile values, with custom accuracy value 100
SELECT percentile_approx(col, array(0.25, 0.5), 100) FROM table
```

### NOTE:
1. The `percentile_approx` implementation is different from Hive, so the result returned on same query maybe slightly different with Hive. This implementation uses `QuantileSummaries` as the underlying probabilistic data structure, and mainly follows paper `Space-efficient Online Computation of Quantile Summaries` by Greenwald, Michael and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)`
2. The current implementation of `QuantileSummaries` doesn't support automatic compression. This PR has a rule to do compression automatically at the caller side, but it may not be optimal.

## How was this patch tested?

Unit test, and Sql query test.

## Acknowledgement
1. This PR's work in based on lw-lin's PR apache#14298, with improvements like supporting partial aggregation, fixing out of memory issue.

Author: Sean Zhong <seanzhongdatabricks.com>

Closes apache#14868 from clockfly/appro_percentile_try_2.

Author: Sean Zhong <[email protected]>

Closes apache#73 from clockfly/appro_percentile_branch_2.0.
@gatorsmile
Copy link
Member

Although the function returns the approximate percentile, the result is still deterministic. Is my understanding right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants