-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-24935][SQL] fix Hive UDAF with two aggregation buffers #24144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Hi @pgandhi999 , I think you are right about the mismatch between Hive UDAF and Spark UDAF framework. Since this is a regression, and it may take a long time for you to get familiar with the Spark aggregate framework, I take it over and try to get this in before 2.4.1. Please take a look, thanks! |
|
also cc @gatorsmile |
|
@cloud-fan Yes, you are right, this fix looks better. Will review the same. Thank you. |
|
@cloud-fan I tested your PR with the test case mentioned in JIRA and it fails with the following error: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC you are checking whether buffer passed in the method is null and based on that you create a partial2 mode buffer. What if the buffer is not null but is of type partial1? Will that cause issues here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can happen if Spark initializes a UDAF, run update and then run merge. I don't think that will happen in Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I came through the DataSketches hll issue, and find it is still problematic in spark 2.4.1, which was released with this change. Briefly the test case @pgandhi999 posted here #23778 is not passed from Spark 2.4.1. However another testcase passed when I created a DataFrame only based on an single-object array, which means the DataFrame is actually not distributed into multiple threads. I believe what @pgandhi999 said here came true, and more scary, which is unexpected in Spark per @cloud-fan. I post the error log bellow:
Caused by: java.lang.ClassCastException: com.yahoo.sketches.hive.hll.SketchState cannot be cast to com.yahoo.sketches.hive.hll.UnionState
at com.yahoo.sketches.hive.hll.SketchEvaluator.merge(SketchEvaluator.java:56)
at com.yahoo.sketches.hive.hll.DataToSketchUDAF$DataToSketchEvaluator.merge(DataToSketchUDAF.java:100)
at org.apache.spark.sql.hive.HiveUDAFFunction.merge(hiveUDFs.scala:430)
at org.apache.spark.sql.hive.HiveUDAFFunction.merge(hiveUDFs.scala:307)
at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.merge(interfaces.scala:539)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$2.apply(AggregationIterator.scala:174)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$2.apply(AggregationIterator.scala:174)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:188)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:182)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregator$$anon$1.findNextSortedGroup(ObjectAggregationIterator.scala:275)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregator$$anon$1.hasNext(ObjectAggregationIterator.scala:247)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.hasNext(ObjectAggregationIterator.scala:81)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Test build #103673 has finished for PR 24144 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need a partial2 evaluator and a final evaluator. We just need one final evaluator.
The partial2 evaluator consumes agg buffer and produces agg buffer, while the final evaluator consumers agg buffer and produce final result. That said, the final evaluator can execute merge, and we don't need the partial2 evaluator.
|
Test build #103729 has finished for PR 24144 at commit
|
|
@cloud-fan PR is still failing with the same error as above after the push. |
|
weird, all tests pass at Spark side. Let me revert the removal of the partial2 evaluator and see if it works |
|
@cloud-fan The test works now. Thank you. |
|
Also I figured out that my machine had an issue and hence, your old commit did not get updated. I tested the code without the last commit and that works too. Sorry, my bad. |
|
@pgandhi999 no worry, thanks for your confirmation! Happy to know that my cleanup is corrected :P |
|
Test build #103788 has finished for PR 24144 at commit
|
|
Test build #103794 has finished for PR 24144 at commit
|
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static enum Mode {
/**
* PARTIAL1: from original data to partial aggregation data:
* iterate() and
* terminatePartial() will be called.
*/
PARTIAL1,
/**
* PARTIAL2: from partial aggregation data to partial aggregation data:
* merge() and
* terminatePartial() will be called.
*/
PARTIAL2,
/**
* FINAL: from partial aggregation to full aggregation:
* merge() and
* terminate() will be called.
*/
FINAL,
/**
* COMPLETE: from original data directly to full aggregation:
* iterate() and
* terminate() will be called.
*/
COMPLETE
};
Could you improve the comments and explain how these four modes are implemented?
|
The 4 modes exactly match what Spark has, although the names are a little different. partial2 is called partial-merge in Spark. The problem here is, Hive UDAF can know the mode during initialization, while Spark can't. Technically Hive UDAF can pick a different buffer implementation for each mode, and to fully support it we need to refactor the Spark aggregate framework to give mode to Spark UDAF as well. This is overkill IMO and this patch is a best-effort to work around it. I think Hive UDAF will only pick a different buffer implementation for different kinds of inputs(original record or agg buffer), which is the case of the sketches library. |
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is our best-effort support Hive UDAF.
Thanks! Merged to master/2.4
## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close #23778 ## How was this patch tested? a new test Closes #24144 from cloud-fan/hive. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit a6c207c) Signed-off-by: gatorsmile <[email protected]>
…H in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of #24144 . #24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes #24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 7432e7d) Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close apache#23778 ## How was this patch tested? a new test Closes apache#24144 from cloud-fan/hive. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]>
…H in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes apache#24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? backport #24144 and #24459 to 2.3. ## How was this patch tested? existing tests Closes #24539 from cloud-fan/backport. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close apache#23778 ## How was this patch tested? a new test Closes apache#24144 from cloud-fan/hive. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit a6c207c) Signed-off-by: gatorsmile <[email protected]>
…H in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes apache#24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 7432e7d) Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close apache#23778 ## How was this patch tested? a new test Closes apache#24144 from cloud-fan/hive. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit a6c207c) Signed-off-by: gatorsmile <[email protected]>
…H in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes apache#24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 7432e7d) Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close apache#23778 ## How was this patch tested? a new test Closes apache#24144 from cloud-fan/hive. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit a6c207c) Signed-off-by: gatorsmile <[email protected]>
…H in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of apache#24144 . apache#24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes apache#24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 7432e7d) Signed-off-by: Wenchen Fan <[email protected]>
…H in Hive UDAF adapter ## What changes were proposed in this pull request? This is a followup of apache/spark#24144 . #24144 missed one case: when hash aggregate fallback to sort aggregate, the life cycle of UDAF is: INIT -> UPDATE -> MERGE -> FINISH. However, not all Hive UDAF can support it. Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). The buffer for UPDATE may not support MERGE. This PR updates the Hive UDAF adapter in Spark to support INIT -> UPDATE -> MERGE -> FINISH, by turning it to INIT -> UPDATE -> FINISH + IINIT -> MERGE -> FINISH. ## How was this patch tested? a new test case Closes #24459 from cloud-fan/hive-udaf. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 7432e7d) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the sketches library.
However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it.
All credits go to @pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests.
close #23778
How was this patch tested?
a new test