-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-24208][SQL] Fix attribute deduplication for FlatMapGroupsInPandas #21737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@mgaido91, are you able to add a test in Python side too? |
|
@HyukjinKwon sure, I am adding it, thanks. |
|
Test build #92762 has finished for PR 21737 at commit
|
|
Test build #92760 has finished for PR 21737 at commit
|
|
Test build #92764 has finished for PR 21737 at commit
|
| (oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions))) | ||
|
|
||
| case oldVersion @ FlatMapGroupsInPandas(_, _, output, _) | ||
| if AttributeSet(output).intersect(conflictingAttributes).nonEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not using oldVersion.outputSet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @maryannxue Deduplicating on conflicting attributes in this function is easily broken. In the long term, this is not the perfect way to handle it. We should consider to fundamentally fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile I agree with you. Moreover, there are other possible problems in having the same expressions (with same exprId) in different part of a tree (please see SPARK-24051). So probably on long term we can add a specific rule for addressing this problem (extending/generalizing what I tried to do in SPARK-24051). What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to ensure all the expressions have unique IDs, instead of deduplicating it when we hit conflicts.
|
Test build #92804 has finished for PR 21737 at commit
|
|
retest this please |
|
Test build #92812 has finished for PR 21737 at commit
|
|
retest this please |
|
Test build #92821 has finished for PR 21737 at commit
|
|
LGTM Thanks! Merged to master/2.3 |
A self-join on a dataset which contains a `FlatMapGroupsInPandas` fails because of duplicate attributes. This happens because we are not dealing with this specific case in our `dedupAttr` rules. The PR fix the issue by adding the management of the specific case added UT + manual tests Author: Marco Gaido <[email protected]> Author: Marco Gaido <[email protected]> Closes #21737 from mgaido91/SPARK-24208. (cherry picked from commit ebf4bfb) Signed-off-by: Xiao Li <[email protected]>
| 'mixture.*aggregate function.*group aggregate pandas UDF'): | ||
| df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect() | ||
|
|
||
| def test_self_join_with_pandas(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized this test is in a wrong class. This should be moved to GroupedMapPandasUDFTests
| datasetWithUDF.unpersist(true) | ||
| } | ||
|
|
||
| test("SPARK-24208: analysis fails on self-join with FlatMapGroupsInPandas") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case should be rewritten and moved to AnalysisSuite
|
@mgaido91 Since 2.3.2 release will be out soon, I merge this fix to 2.3 branch. Regarding the comments of the test cases, could you submit a follow-up PR? |
|
|
||
| df = self.spark.createDataFrame([Row(key=1, col='A'), Row(key=1, col='B'), | ||
| Row(key=2, col='C')]) | ||
| dfWithPandas = df.groupBy('key').apply(dummy_pandas_udf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: dfWithPandas -> df_with_pandas
What changes were proposed in this pull request?
A self-join on a dataset which contains a
FlatMapGroupsInPandasfails because of duplicate attributes. This happens because we are not dealing with this specific case in ourdedupAttrrules.The PR fix the issue by adding the management of the specific case
How was this patch tested?
added UT + manual tests