-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-24935][SQL] : Problem with Executing Hive UDF's from Spark 2.2 Onwards #23778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Created new abstract class HiveTypedImperativeAggregate which is a framework for hive related aggregation functions. Also, there seems to be a bug in SortBasedAggregator where it was calling merge on aggregate buffer without initializing them. Have fixed it in this PR.
|
ok to test |
|
Test build #102315 has finished for PR 23778 at commit
|
|
could you add tests in this pr? |
|
@maropu Yes, I am definitely going to add unit tests here, for now I am working on fixing the unit tests as this PR is failing a bunch of hive compatibility tests(trying to figure out why :) ). |
|
Test build #102373 has finished for PR 23778 at commit
|
|
Test build #102396 has finished for PR 23778 at commit
|
|
can you briefly explain how the hive UDAF works? Then we can have more people looking at it and see how to map it to Spark's |
|
Sure @cloud-fan . Thank you for your response. As far as my understanding of Hive UDAF is concerned, I can roughly classify them into into types: those that support partial aggregation(Mode PARTIAL and FINAL) and those that do not(Mode COMPLETE). For the Hive UDAFs that support partial aggregation, there are five phases:
For the Hive UDAFs that do not support partial aggregation, I have seen the following three phases:
For more information, you may find this link helpful: https://cwiki.apache.org/confluence/display/Hive/GenericUDAFCaseStudy This information is based on what I have found out during my tests and reading through the docs and it is based on this information that I have modeled the behaviour of the class |
|
Test build #102685 has finished for PR 23778 at commit
|
|
Test build #102686 has finished for PR 23778 at commit
|
|
Test build #102687 has finished for PR 23778 at commit
|
|
@maropu @cloud-fan I have added unit tests to the PR for a more detailed understanding of the issue as well as the fix. Thank you. |
|
|
||
| final override def initialize(buffer: InternalRow): Unit = { | ||
| partial2ModeBuffer = buffer.copy() | ||
| partial2ModeBuffer(mutableAggBufferOffset) = createPartial2ModeAggregationBuffer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little lost here. So this HiveTypedImperativeAggregate has 2 buffers? What's the difference between partial2ModeBuffer and buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So Spark Catalyst has designed UDAF execution such that it uses one aggregation buffer for performing the aggregations for all UDAF operators(Sort based, Object hash based etc.) which makes sense from Spark's point of view. However, from Hive's point of view, two aggregate buffers are expected to be used, one for PARTIAL1/COMPLETE and the other for PARTIAL2/FINAL modes respectively. Since, I did not wish to redesign Catalyst UDAF structure only for Hive, I have let the original calls and buffer be as they are for PARTIAL1/COMPLETE mode and have created the partial2ModeBuffer exclusively for PARTIAL2/FINAL mode operations. Thus, to answer your question, buffer here is used for Partial1 mode operations and partial2ModeBuffer is used for Partial2 mode operations respectively. I hope that answers your question. Thank you once again for reviewing @cloud-fan .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I don't quite understand how you make the Hive UDAF work with Spark's two phase aggregate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Can you please elaborate more on the two phase aggregate functionality by Spark? That will help me understand and answer your question better. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's start with the 5 phases of a UDAF:
- Initialize: The aggregation buffers for PARTIAL1 Mode and PARTIAL2 Mode are created in this phase.
- Iterate(Update) : This state processes a new row of data into the aggregation buffer created for PARTIAL1.
- TerminatePartial: Returns the contents of the aggregation buffer.
- Merge: Merges a partial aggregation returned by calling terminatePartial() on PARTIAL1 aggregation buffer into the current aggregation happening on PARTIAL2 aggregation buffer.
- Terminate: Returns the final result of the aggregation stored in PARTIAL2 buffer to Hive.
In Spark, a UDAF will be run twice in two adjacent aggregate operators, called partial aggregate and final aggregate. In the partial aggregate, there are 3 steps:
- initialize the UDAF
- update UDAF with input data (so-called Iterate)
- return the UDAF buffer (so-called TerminatePartial)
In the final aggregate, also 3 steps:
- initialize the UDAF
- update UDAF with buffer data from the partial aggregate (so-called Merge)
- return final result (so-called Terminate)
But this doesn't work for the 3-phase UDAF which doesn't support partial aggregate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Hive UDAF, when to use which agg buffer? I think this is the most important information to justify your patch. It will be better if you can point to some Hive doc/code comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I went through Hive docs and asked a couple of people around; officially, hive does not mention anything about using two different aggregation buffers, the main point is to have some kind of distinction between different phases of Hive.
Consider a classic map-reduce process. There are two phases: map and reduce (sometimes an optional combine phase in between). The phases can run on different nodes. The state lives within a phase and does not cross the boundaries. The map phase corresponds to the "partial1" mode (init + iterate + terminate partial). The reduce phase corresponds to the "final" mode (init + merge + terminate). The combine phase corresponds to the "partial2" mode (init + merge + terminate partial). The "complete" mode is a special shortcut to run the whole thing as a single phase (init + iterate + terminate). The bug here is about a state crossing the boundaries between the phases: initialized for one phase (mode), but then passed to a different phase. So by using different aggregation buffers, I am trying to encapsulate the corresponding state within a particular phase. The solution can also be modified to have a single aggregation buffer supporting states of different phases.
In my PR above, the assumption is that the Partial1 aggregation buffer supports phases PARTIAL1/COMPLETE and the Partial2 aggregation buffer supports phases PARTIAL2/FINAL.
I shall also paste a link to a good blog that explains the usage of aggregation buffers in a generic Hive UDAF : https://blog.dataiku.com/2013/05/01/a-complete-guide-to-writing-hive-udf
As this is also a kind of a design change problem, it is completely open to further discussions and improvements. My solution is just one of a kind solution and there are multiple solutions to achieve the same thing. However, as far as I can say, my solution is relatively cleaner and easier to understand and also it does not create a change of any manner in the way with which existing aggregation functions work with Spark SQL(does not break compatibility).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there are 4 ways to execute a UDAF
- init + iterate + terminate partial
- init + merge + terminate final
- init + merge + terminate partial
- init + iterate + terminate final
Spark doesn't really have terminate partial. The agg buffer needs to fit the spark schema so Spark can get agg buffer directly. Spark UDAF is flexible: after initialized, the buffer can be updated via either iterate or merge, the buffer can be terminated always.
IIUC init + merge + terminate final is pretty common in GROUP BY queries, and Hive UDAF works in this case. Do you know why?
And your test case is init + iterate + terminate final, what's the correct steps to do it? Is it
1. create partial1 buffer
2. iterate
3 turn partial1 buffer to partial2 buffer
4. terminate final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Sorry for the delayed response, I agree with your point above. However, I did not understand the question correctly.
IIUC init + merge + terminate final is pretty common in GROUP BY queries, and Hive UDAF works in this case. Do you know why?
And your test case is init + iterate + terminate final
Actually my test case isinit + iterate + terminate partialalongwithinit + iterate + merge + terminate partialand finally ending withinit + merge + terminate final. So according to me, the correct steps here would roughly be:
1. create partial1 buffer
2. iterate
3. merge partial1 buffer into partial2 buffer
4. terminate final
Apologies if I have misread your above comment and have not answered it appropriately, please let me know. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to make it clear, the partial 1 buffer can only be used in iterator to consume records, and partial 2 buffer can only be used in merge to consume buffers, do I understand it correctly?
|
We're now going ahead for Spark 3.0. Do we still need to support this case of Hive UDAF back? Sorry if I am asking a question that's already answered somewhere. |
|
@cloud-fan Your other PR fixes the Hive UDAF bug but the issue with SortBasedAggregator where it was calling merge on aggregate buffer without initializing them still remains which this PR addresses. I will go ahead and clean up this PR to put only the fix for the above bug. Thank you. |
## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close #23778 ## How was this patch tested? a new test Closes #24144 from cloud-fan/hive. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit a6c207c) Signed-off-by: gatorsmile <[email protected]>
## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close apache#23778 ## How was this patch tested? a new test Closes apache#24144 from cloud-fan/hive. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]>
…or So… …rtBasedAggregate Normally, the aggregate operations that are invoked for an aggregation buffer for User Defined Aggregate Functions(UDAF) follow the order like initialize(), update(), eval() OR initialize(), merge(), eval(). However, after a certain threshold configurable by spark.sql.objectHashAggregate.sortBased.fallbackThreshold is reached, ObjectHashAggregate falls back to SortBasedAggregator which invokes the merge or update operation without calling initialize() on the aggregate buffer. ## What changes were proposed in this pull request? The fix here is to initialize aggregate buffers again when fallback to SortBasedAggregate operator happens. ## How was this patch tested? The patch was tested as part of [SPARK-24935](https://issues.apache.org/jira/browse/SPARK-24935) as documented in PR #23778. Closes #24149 from pgandhi999/SPARK-27207. Authored-by: pgandhi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close apache#23778 ## How was this patch tested? a new test Closes apache#24144 from cloud-fan/hive. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit a6c207c) Signed-off-by: gatorsmile <[email protected]>
## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close apache#23778 ## How was this patch tested? a new test Closes apache#24144 from cloud-fan/hive. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit a6c207c) Signed-off-by: gatorsmile <[email protected]>
## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close apache#23778 ## How was this patch tested? a new test Closes apache#24144 from cloud-fan/hive. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit a6c207c) Signed-off-by: gatorsmile <[email protected]>
## What changes were proposed in this pull request? Hive UDAF knows the aggregation mode when creating the aggregation buffer, so that it can create different buffers for different inputs: the original data or the aggregation buffer. Please see an example in the [sketches library](https://github.com/DataSketches/sketches-hive/blob/7f9e76e9e03807277146291beb2c7bec40e8672b/src/main/java/com/yahoo/sketches/hive/cpc/DataToSketchUDAF.java#L107). However, the Hive UDAF adapter in Spark always creates the buffer with partial1 mode, which can only deal with one input: the original data. This PR fixes it. All credits go to pgandhi999 , who investigate the problem and study the Hive UDAF behaviors, and write the tests. close apache/spark#23778 ## How was this patch tested? a new test Closes #24144 from cloud-fan/hive. Lead-authored-by: pgandhi <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit a6c207c) Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit 0cfefa7)
A user of sketches library reported an issue with HLL Sketch Hive UDAF that seems to be a bug in Spark or Hive. Their code runs fine in 2.1 but has an issue from 2.2 onwards.
For more details on the issue, you can refer to the discussion in the sketches-user list
On further debugging, we figured out that from 2.2 onwards, Spark hive UDAF provides support for partial aggregation, and has removed the functionality that supported complete mode aggregation (see SPARK-19060 and SPARK-18186).
Thus, instead of expecting update method to be called, merge method is called here which throws the exception as described in the forums above.
What changes were proposed in this pull request?
Created new abstract class
HiveTypedImperativeAggregatewhich is a framework for Hive related aggregation functions.Also, there seems to be a bug in
SortBasedAggregatorwhere it was calling merge on aggregate buffer without initializing them. Have fixed it in this PR.How was this patch tested?
The steps to reproduce the above issue have been stated in the google group link posted above but will repeat them here for convenience:
1. Download the following three jars from the maven repository in here.
2. Launch
spark-shellby adding the above jars in the driver as well as executor classpath and run the following commands:3. You will see the following exception below:
4. After the code changes in this PR, run the same test as above and it should work.