-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-24495][SQL] EnsureRequirement returns wrong plan when reordering equal keys #21529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { | ||
| val leftKeysBuffer = ArrayBuffer[Expression]() | ||
| val rightKeysBuffer = ArrayBuffer[Expression]() | ||
| val alreadyUsedIndexes = mutable.Set[Int]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe pickedIndexes?
|
good catch! thanks! |
|
Test build #91666 has finished for PR 21529 at commit
|
| withSQLConf(("spark.sql.shuffle.partitions", "1"), | ||
| ("spark.sql.constraintPropagation.enabled", "false"), | ||
| ("spark.sql.autoBroadcastJoinThreshold", "-1")) { | ||
| val df1 = spark.range(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make sure range has more than one partitions, otherwise we can't reproduce the bug.
|
I was surprised that this bug is present even if the table is not bucketed, and then I found out another problem in the code: This makes the bug has a much larger impact: when we do a shuffle join, we add shuffle exchange to both of the join sides, which then triggers @mgaido91 let's also remove the also cc @tejasapatil |
|
one followup: we should improve our golden file sql test, to run same queries with different configs. This is pretty important for the join tests, otherwise we only test broadcast join. |
|
thanks for your review @cloud-fan. Nice catch on the As far as the followup is regarded, we should decide if we want to support the possibility that with different configs we get different results or not. In the second case we have more flexibility, but we have also to be much more careful in order not to introduce bugs when different configs should not produce different results. Moreover, we would also have many more golden files... |
|
Test build #91705 has finished for PR 21529 at commit
|
|
retest this please |
|
Test build #91710 has finished for PR 21529 at commit
|
| } | ||
|
|
||
| test("SPARK-24495: EnsureRequirements can return wrong plan when reusing the same key in join") { | ||
| withSQLConf(("spark.sql.shuffle.partitions", "1"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not use hard coded config name, use SQLConf.SHUFFLE_PARTITIONS instead.
a nit: we usually use a -> b, c -> d, ... to specify config pairs.
| withSQLConf(("spark.sql.shuffle.partitions", "1"), | ||
| ("spark.sql.constraintPropagation.enabled", "false"), | ||
| ("spark.sql.autoBroadcastJoinThreshold", "-1")) { | ||
| val df1 = spark.range(100).repartition(2, $"id", $"id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a simpler way spark.range(0, 100, 1, numPartitions = 2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this way would not be ok, as we would have a RangePartitioning while the issue appears only with HashPartitioning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, the issue can happen with range partition(because of the double transformation issue), the code in the ticket can reproduce the bug and it has no hash partitioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but if we remove the transformUp as you correctly suggested, if we do not use HashPartitioning we do not test the proper behavior of the reorder method.
Anyway, I added another test to PlannerSuite which checks the behavior of the reorder method, so I will follow your suggestion here, thanks.
| ("spark.sql.constraintPropagation.enabled", "false"), | ||
| ("spark.sql.autoBroadcastJoinThreshold", "-1")) { | ||
| val df1 = spark.range(100).repartition(2, $"id", $"id") | ||
| val df2 = spark.range(100).select(($"id" * 2).as("b1"), (- $"id").as("b2")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
($"id" * 2).as("b1") -> $"id".as("b1"), to minimize the test.
|
I think we need to support both. testing different physical operators needs same result, testing something like type coercion mode needs different result. Anyway let's discuss it in the followup. |
|
@mgaido91 Could you help improve the test coverage of joins |
ok @cloud-fan, I'll try and send a proposal in the next days.
Sure, @gatorsmile, I am happy to. Do you mean running the existing tests for every type of join or do you have something different in mind? Thanks. |
|
I think for this PR, apart from the end-to-end test for checking result, we should also have a unit test in |
| outputPlan match { | ||
| case SortMergeJoinExec(leftKeys, rightKeys, _, _, _, _) => | ||
| assert(leftKeys == Seq(exprA, exprA)) | ||
| assert(rightKeys.contains(exprB) && rightKeys.contains(exprC)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to check rightKeys == Seq(exprB, exprC)?
| SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { | ||
| val df1 = spark.range(0, 100, 1, 2) | ||
| val df2 = spark.range(100).select($"id".as("b1"), (- $"id").as("b2")) | ||
| val res = df1.join(df2, $"id" === $"b1" && $"id" === $"b2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one difference between this test and the code in JIRA ticket is, the code in JIRA ticket has a Project above join, to trigger the double transformation issue. We should add a Project and make sure this test does fail without this patch.
| } | ||
| } | ||
|
|
||
| test("SPARK-24495: EnsureRequirements can return wrong plan when reusing the same key in join") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as end-to-end test, maybe a better name is: SPARK-24495: Join may return wrong result when having duplicated equal-join keys
|
LGTM except some minor comments about test |
|
Test build #91773 has finished for PR 21529 at commit
|
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@mgaido91 can you fix the conflicts? thanks! |
|
Test build #91790 has finished for PR 21529 at commit
|
|
retest this please |
|
Test build #91797 has finished for PR 21529 at commit
|
|
Test build #91826 has finished for PR 21529 at commit
|
|
retest this please |
|
Test build #91843 has finished for PR 21529 at commit
|
|
Thanks! Merged to master/2.3 |
…ng equal keys `EnsureRequirement` in its `reorder` method currently assumes that the same key appears only once in the join condition. This of course might not be the case, and when it is not satisfied, it returns a wrong plan which produces a wrong result of the query. added UT Author: Marco Gaido <[email protected]> Closes #21529 from mgaido91/SPARK-24495. (cherry picked from commit fdadc4b) Signed-off-by: Xiao Li <[email protected]>
|
Thanks @gatorsmile. Sorry, may I ask what you think about #21529 (comment)? Thanks. |
|
Adding new queries to |
…ng equal keys `EnsureRequirement` in its `reorder` method currently assumes that the same key appears only once in the join condition. This of course might not be the case, and when it is not satisfied, it returns a wrong plan which produces a wrong result of the query. added UT Author: Marco Gaido <[email protected]> Closes apache#21529 from mgaido91/SPARK-24495. (cherry picked from commit fdadc4b) Ref: LIHADOOP-40100 RB=1410545 BUG=LIHADOOP-40100 G=superfriends-reviewers R=fli,mshen,yezhou,edlu A=edlu
What changes were proposed in this pull request?
EnsureRequirementin itsreordermethod currently assumes that the same key appears only once in the join condition. This of course might not be the case, and when it is not satisfied, it returns a wrong plan which produces a wrong result of the query.How was this patch tested?
added UT