-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25951][SQL] Ignore aliases for distributions and orderings #22957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #98521 has finished for PR 22957 at commit
|
|
Test build #98552 has finished for PR 22957 at commit
|
|
i didn't look at your new code, but is your old code safe? e.g. a project that depends on the new alias. |
|
Thanks for you comment @rxin. It was safe for comparisons (I mean to say: this 2 expressions return the same data), because anyway all the |
| * different output expressions can evaluate to the same result as well (eg. when an expression | ||
| * is aliased). | ||
| */ | ||
| def sameResult(other: Expression): Boolean = other match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's always safer to introduce a new API, does is it really necessary? In Canonicalize, we erase the name for attributes, I think it's reasonable to erase the name of Alias too, as it doesn't affect the output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is reasonable but it doesn't solve the problem stated in the JIRA. So the goal here is to avoid that something like a as b is considered different from a in terms of ordering/distribution. If we just erase the name of alias, the 2 expression would still be different because of the presence of Alias itself would make the 2 expressions different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"erase the name" can also mean remove Alias. If we can't clearly tell the difference between semanticEquals and sameResult, and give a guideline about using which one in which case, I think we should just update semanticEquals(i.e. Canonicalize).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove Alias is not possible for the reason explained in #22957 (comment). In general, semanticEquals should be used when we want to replace an expression with another, while sameResult should be used in order to check that 2 expressions return the same output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you put it in the method doc(both semanticEquals and sameResult)? This makes sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, thanks.
| */ | ||
| def sameResult(other: Expression): Boolean = other match { | ||
| case a: Alias => sameResult(a.child) | ||
| case _ => this.semanticEquals(other) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also strip the alias of this here? so that we can mark sameResult as final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is doable, but I didn't want to put too many match where it was not needed. But if you prefer that way I can try and do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, it needs to be overridden by HashPartitioning too, so since I am not able to make it final anyway, I don't think it is a good idea. Well, I can add a match on HashPartitioningtoo, but it doesn't seem a clean solution to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can do
CleanupAliases.trimAliases(this) semanticEquals CleanupAliases.trimAliases(other)
|
Test build #99377 has finished for PR 22957 at commit
|
|
Test build #99376 has finished for PR 22957 at commit
|
|
Test build #99375 has finished for PR 22957 at commit
|
|
retest this please |
|
Test build #99424 has finished for PR 22957 at commit
|
|
Test build #99446 has finished for PR 22957 at commit
|
|
retest this please |
|
Test build #99448 has finished for PR 22957 at commit
|
| e.transformDown { | ||
| case Alias(child, _) => child | ||
| case MultiAlias(child, _) => child | ||
| case Alias(child, _) => trimAliases(child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's going on here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the point is that now this method removes only the first Alias it finds (and it doesn't go on recursively), which is the reason of the UT failure. Also checking the comment on the method it seems not the expected behavior of this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's transformDown, why doesn't it work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I did a stupid thing here. So the problem is that: since it returns child for this, in transformDown we apply the rule to child children, instead of applying to child itself. So the problem here is with 2 consecutive Alias. Let me find a better fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just using transformUpsolves the issue
|
Test build #99449 has finished for PR 22957 at commit
|
|
Test build #99462 has finished for PR 22957 at commit
|
| } | ||
| } | ||
|
|
||
| test("SPARK-25951: avoid redundant shuffle on rename") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have an end-to-end test as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, good point and indeed very useful. In my previous tests I always used a very simple query to verify this and never the one reported in the JIRA. Now I tried that one and I realized that this fix is not very useful as of now, because in renaming like that in the HashPatitioning there is the AttributeReference to the Alias, rather than the Alias itself. Since that is the common case, the PR as it is now it is not very useful. If I won't be able to figure out a good way for that, I am going to close this. Thanks and sorry for the trouble.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @viirya I added the test, but as I mentioned I had to do another change in order to make it working. Sorry for the mistake. I'd really appreciate if you could review it again. Thanks.
|
LGTM, cc @viirya as well |
|
|
||
| /** | ||
| * Returns true when two expressions will always compute the same result, even if the output may | ||
| * be different, because of different names or similar differences. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here output is a bit confusing. Do we mean the output names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So sameResult returns if the evaluated results between two expressions are exactly the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I mean: sameResult returns true if 2 expressions return the same data even though from plan perspective they are not the same (eg. the output name/exprIds is different as in this case), while semanticEquals ensure they are the same from plan perspective too. If you have better suggestions how to rephrase this, I am happy to improve it. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about replace output with output from plan perspective?
Returns true when two expressions will always compute the same result, even if the output
from plan perspective may be different, because of different names or similar differences.
|
This looks good to me. Just a comment about wording. |
|
Sorry about jumping in, but looks like we missed #17400 (SPARK-19981) which looks like containing the fix what @cloud-fan suggests - reviewers just lost focus on that. For me #17400 looks very concise, so curious how this patch is different from #17400. |
|
@HeartSaVioR thanks for pointing that out! Yes, that fix is similar to this, the main difference between these 2 PRs is that this one handles also the case when we have |
|
@maropu thanks for checking this. Do you mean using the trait approach? If so, sure, I am doing. If not, please let me know. Thanks. |
|
Test build #102147 has finished for PR 22957 at commit
|
|
Test build #102150 has finished for PR 22957 at commit
|
|
Test build #102154 has finished for PR 22957 at commit
|
|
Just 2 cents, personally I understood @maropu's comment as taking up his PR, in other words, rebasing this branch to his branch to retain his commits, and adding @mgaido91 commit on top of his branch to fix up remaining issue. This would make giving authorship of this PR to both of @maropu and @mgaido91 easier (looks correct way to give credits on actual works). |
|
Thanks, @mgaido91 and @HeartSaVioR! The fix @mgaido91 did looks ok to me. But, I don't review the latest version yet and I'll do in a few days. |
| * caused by the rename of an attribute among the partitioning ones, eg. | ||
| * | ||
| * spark.range(10).selectExpr("id AS key", "0").repartition($"key").write.saveAsTable("df1") | ||
| * spark.range(10).selectExpr("id AS key", "0").repartition($"key").write.saveAsTable("df2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean view here? otherwise the physical plan doesn't match.
|
@HeartSaVioR @maropu rebasing this PR with @maropu 's change and work on it is non-trivial, I can rather close this and create a new one based on @maropu 's branch if you prefer. Otherwise committers can add @maropu as author when this will be merged or @maropu can take this over and create a new PR based on this. Just let me know how you prefer to go on, thanks. |
|
|
||
| override private[spark] def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = { | ||
| if (this.references.contains(invalidAttr)) { | ||
| UnknownPartitioning(numPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add comments to explain it.
HashPartitioning('a, 'b) with output expressions 'a as 'a1, should produce UnknownPartitioning instead of HashPartitioning('a1), which is wrong.
|
|
||
| override private[spark] def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = { | ||
| if (this.references.contains(invalidAttr)) { | ||
| val validExprs = this.children.takeWhile(!_.references.contains(invalidAttr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.children -> ordering?
| if (validExprs.isEmpty) { | ||
| UnknownPartitioning(numPartitions) | ||
| } else { | ||
| RangePartitioning(validExprs, numPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think about RangePartitioning('a.ASC, 'b.ASC) with output expression 'a as 'a1.
It cannot satisfy ClusteredDistribution('a1), but can still satisfy OrderedDistribution('a1.ASC). I think the expected result should be RangePartitioning('a1.ASC, 'b.ASC) instead of RangePartitioning('a1.ASC), which is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't it satisfy ClusteredDistribution('a1)? I don't agree with what you stated. If b is not in the output, it is useless to have it there. Moreover, when ordering we order always for the first attribute, then for the second, ... So if something is partitioned by RangePartitioning('a1.ASC, 'b.ASC), it is also true that its partitioning is RangePartitioning('a1.ASC, ). So I think that in that case RangePartitioning('a1.ASC) is the right one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to RangePartitioning.satisfies0, and the classdoc of RangePartitioning and ClusteredDistribution, RangePartitioning('a.ASC, 'b.ASC) does not satisfy ClusteredDistribution('a).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmmh, I am not sure about what you mean referring to the classdoc of these 2 classes. I see nothing there about this. Anyway, I see that the implementation is done according what you state, but I do believe that it is wrong (or at least suboptimal if you prefer). If the data is partitioned by sorting it with a.ASC, b.ASC, it is definitely partitioned by sorting it with a.ASC. I think that forall should be an exists. There is also a (very minor) bug in the current implementation; try and running this test (it fails...):
test("partitioning test") {
val attr1 = AttributeReference("attr1", IntegerType)()
val attr2 = AttributeReference("attr2", IntegerType)()
val partitioning = RangePartitioning(Seq.empty, 10)
val requiredDistribution = ClusteredDistribution(Seq(attr2, attr1), Some(10))
assert(!partitioning.satisfies(requiredDistribution))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please carefully read the classdoc of RangePartitioning and ClusteredDistribution, and see what RangePartitioning guarantees and what ClusteredDistribution requires.
partition 1: (a=1, b=2), (a=1, b=3)
partition 2: (a=1, b=4), (a=1, b=5)
This data set is range partitioned by a,b, but not clustered by a.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I see now, thanks. Let me update it then accordingly, thanks.
| final override def outputPartitioning: Partitioning = { | ||
| child.outputPartitioning match { | ||
| case partitioning: Expression => | ||
| val exprToEquiv = partitioning.references.map { attr => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain what's going on here? The code is a little hard to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, let me add some comments. Thanks.
|
Test build #102193 has finished for PR 22957 at commit
|
|
Test build #102201 has finished for PR 22957 at commit
|
|
Test build #104484 has finished for PR 22957 at commit
|
|
@cloud-fan @maropu sorry for pinging you again, I think I addressed all your comments on this, may you please check it again? Thanks. |
|
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
What changes were proposed in this pull request?
When we canonicalize an
Expression, we do not removeAlias. So two expressions which are the same but are renamed are considered to be semantically different. As we rely on semantic equality in order to check if the result is the same for 2 expressions, some optimizations - as showed in the JIRA - may fail to apply, eg. removing redundant shuffles, when a column is renamed.The PR proposes to ignore
Aliases when checking whether distributions and orderings are satisfied by introducing a new methodsameResultswhich ignoreAliases.Credit should be given to @maropu for the approach suggestion which follows #17400.
Closes #17400.
How was this patch tested?
added UT