-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-21865][SQL] simplify the distribution semantic of Spark SQL #19080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this fixes a potential bug: the operator between the upper and lower shuffle may have its own partitioning, which means the upper shuffle is still necessary to change the partitioning. We don't have this kind of operator now but we may have it in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defaultPartitioning -> targetPartitioning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any better name? satisfiedPartitioning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that numPartitions only makes sense for hash and ranged partitions, is there a need to specify it as a hint in the default trait? Can we not just make it a parameter for the relevant partitioning scheme(s)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numPartitions is a parameter for the relevant partitioning scheme(s), here we need a way to create partitioning scheme for a certain distribution, and need this numPartitions as a hint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- +1 for
satisfiedPartitioning numPartitions=>numPartitionsHintso that the loose semantics gets highlighted
|
Test build #81221 has finished for PR 19080 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have a needCoPartition() method for distributions ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. It's kind of a blacklist mechanism, only 2 special distribution doesn't need co-location.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- +1 for
satisfiedPartitioning numPartitions=>numPartitionsHintso that the loose semantics gets highlighted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At a conceptual level, this feels weird. One could question: why does any random partitioning is NOT able to satisfy UnspecifiedDistribution ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I liked your doc in SparkPlan.scala. Feels like it should be here OR there should be some reference about that over here. It will be easy for people to get lost trying to reason whats going on here :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not related to this change. A question is, even maxChildrenNumPartitions doesn't require us to shuffle all children, if it needs to shuffle most of children, it's still a bad choice as the number of partitions, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should pick the most frequent numPartitions, but for now the only operator needing co-partition is SortMergeJoinExec, which only has 2 children, so pick the max one is fine. We can revisit this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From needCoPartitionChildIndex, looks like UnspecifiedDistribution is also not counted as one of co-partition distributions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned this at the beginning of this doc: provides specific distribution requirements for more than one child, specific distribution excludes UnspecifiedDistribution, I'll make it more explicit.
|
also cc @marmbrus |
|
Test build #81264 has finished for PR 19080 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SingleNodeDistribution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I spoke with @sameeragarwal about this a little. The whole point here was to have a logical / physical separation (Distribution and Partitioning respectively). AllTuples could be SingleNode or it could be Broadcast. All the operation wants to know is that its seeing all of them and it shouldn't care about how that is being accomplished.
Now, since the first version, we have started to deviate from that. I'm not sure if this is still the right thing to do, but I wanted to give a little context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the concern about logical/physical separation is only useful if we take CBO into consideration. E.g., for a AllTuples distribution requirement, the planner may produce two plans using SinglePartition and BroadcastPartitioning respectively and pick a cheaper one. In the scope of our current planner framework, this separation doesn't seem to be very useful, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure you can make that argument (I'm not sure I buy it), but then why does this PR still have two different concepts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we should be moving towards CBO, not away from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to keep AllTuples. SingleNodeDistribution is a special case of AllTuples and seems we do not really need the extra information introduced by SingleNode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to keep the Distribution concept, otherwise it's very weird to say a RangePartitioning satisfy HashPartitioning, while it looks reasonable to say RangePartitioning satisfy ClusteredDistribution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I did miss the point that ClusteredDistribution covers both RangePartitioning and HashPartitioning.
|
cc @yhuai |
|
Have a question after reading the new approach. Let's say that we have a join like Also, regarding
Can you give a concrete example? |
This is not true now. After this PR, join has a stricter distribution requirement called I think this is reasonable,
let's take join as a example, According to the definition of If |
|
so my whole point of view is, co-partition is a really tricky requirement, and it's really hard to implicitly guarantee it during shuffle planning. We should have a weaker guarantee(same number of partitions), and let the operator itself achieve the co-partition requirement by this guarantee and special distribution requirement( Also in the future we may have operators that have distribution requirement for multiple children, but they don't need them to be co-partitioned. |
|
also cc @rxin , to support the "pre-shuffle" feature for data source v2, I need to create similar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we are cleaning things up, this needs fixed. RangePartitioning(a+,b+) does not satisfy OrderedDistribution(a+). It violates the requirement that all values of a need to be in the same partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed this too, the current logic only guarantees ordering but not clustering. But this is orthogonal to this PR and we can fix it in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW this doesn't cause any problems, because OrderedDistribution is only used for sort operator.
|
cc @rxin @JoshRosen @liancheng @sameeragarwal @gatorsmile @brkyvz any more comments? |
|
Test build #82802 has finished for PR 19080 at commit
|
|
retest this please |
|
Test build #84325 has finished for PR 19080 at commit
|
|
retest this please |
|
Test build #85715 has finished for PR 19080 at commit
|
sameeragarwal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few clarifying questions/comments to retain the existing Distribution/Partitioning semantics but overall LGTM for doing away with compatibleWith semantics.
| * This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the | ||
| * number of partitions, this distribution strictly requires which partition the tuple should be in. | ||
| */ | ||
| case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Semantically, a Partitioning satisfies a Distribution so it'd be better not to call this HashPartitioned. How about we call this DeterminsticClusteredDistribution or HashClusteredDistribution? Also perhaps this can just extend ClusteredDistribution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, I'll rename it to HashClusteredDistribution. But I'd like to not extend ClusteredDistribution, since if a partition can satisfy ClusteredDistribution, it may not be able to satisfy HashClusteredDistribution. Thus we can't replace a parent with a child, which obeys Liskov Substitution Principle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
| * entire set of tuples is transformed into different data structure. | ||
| */ | ||
| case class BroadcastDistribution(mode: BroadcastMode) extends Distribution | ||
| case class BroadcastDistribution(mode: BroadcastMode) extends Distribution { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, how about BroadcastPartitioning just satisfying the AllTuples distribution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea good idea, but again, this is an existing problem, let's fix it in another PR.
|
|
||
| // TODO: This is not really valid... | ||
| def clustering: Set[Expression] = ordering.map(_.child).toSet | ||
| override def requiredNumPartitions: Option[Int] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, should an OrderedDistribution make any guarantees around clustering? Do we care if "tuples that share the same value for the ordering expressions will never be split across partitions"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the comment This is a strictly stronger guarantee than [[ClusteredDistribution]], we want to guarantee it. However we actually don't respect it, see https://github.com/apache/spark/pull/19080/files#r136419947
Since it is an existing problem, I'd like to fix it in another PR.
|
LGTM |
|
Test build #85786 has finished for PR 19080 at commit
|
|
retest this please |
|
Test build #85791 has finished for PR 19080 at commit
|
|
thanks, merging to master/2.3! |
## What changes were proposed in this pull request? **The current shuffle planning logic** 1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface. 2. Each operator specifies its output partitioning, via the `Partitioning` interface. 3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`. 4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution. 5. For each operator, check if its children's output partitionings are compatible with each other, via the `Partitioning.compatibleWith`. 6. If the check in 5 failed, add a shuffle above each child. 7. try to eliminate the shuffles added in 6, via `Partitioning.guarantees`. This design has a major problem with the definition of "compatible". `Partitioning.compatibleWith` is not well defined, ideally a `Partitioning` can't know if it's compatible with other `Partitioning`, without more information from the operator. For example, `t1 join t2 on t1.a = t2.b`, `HashPartitioning(a, 10)` should be compatible with `HashPartitioning(b, 10)` under this case, but the partitioning itself doesn't know it. As a result, currently `Partitioning.compatibleWith` always return false except for literals, which make it almost useless. This also means, if an operator has distribution requirements for multiple children, Spark always add shuffle nodes to all the children(although some of them can be eliminated). However, there is no guarantee that the children's output partitionings are compatible with each other after adding these shuffles, we just assume that the operator will only specify `ClusteredDistribution` for multiple children. I think it's very hard to guarantee children co-partition for all kinds of operators, and we can not even give a clear definition about co-partition between distributions like `ClusteredDistribution(a,b)` and `ClusteredDistribution(c)`. I think we should drop the "compatible" concept in the distribution model, and let the operator achieve the co-partition requirement by special distribution requirements. **Proposed shuffle planning logic after this PR** (The first 4 are same as before) 1. Each operator specifies the distribution requirements for its children, via the `Distribution` interface. 2. Each operator specifies its output partitioning, via the `Partitioning` interface. 3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a `Distribution`. 4. For each operator, check each child of it, add a shuffle node above the child if the child partitioning can not satisfy the required distribution. 5. For each operator, check if its children's output partitionings have the same number of partitions. 6. If the check in 5 failed, pick the max number of partitions from children's output partitionings, and add shuffle to child whose number of partitions doesn't equal to the max one. The new distribution model is very simple, we only have one kind of relationship, which is `Partitioning.satisfy`. For multiple children, Spark only guarantees they have the same number of partitions, and it's the operator's responsibility to leverage this guarantee to achieve more complicated requirements. For example, non-broadcast joins can use the newly added `HashPartitionedDistribution` to achieve co-partition. ## How was this patch tested? existing tests. Author: Wenchen Fan <[email protected]> Closes #19080 from cloud-fan/exchange. (cherry picked from commit eb45b52) Signed-off-by: Wenchen Fan <[email protected]>
…ning from children ## What changes were proposed in this pull request? In #19080 we simplified the distribution/partitioning framework, and make all the join-like operators require `HashClusteredDistribution` from children. Unfortunately streaming join operator was missed. This can cause wrong result. Think about ``` val input1 = MemoryStream[Int] val input2 = MemoryStream[Int] val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b) val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b) val joined = df1.join(df2, Seq("a", "b")).select('a) ``` The physical plan is ``` *(3) Project [a#5] +- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ] :- Exchange hashpartitioning(a#5, b#6, 5) : +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6] : +- StreamingRelation MemoryStream[value#1], [value#1] +- Exchange hashpartitioning(b#11, 5) +- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11] +- StreamingRelation MemoryStream[value#3], [value#3] ``` The left table is hash partitioned by `a, b`, while the right table is hash partitioned by `b`. This means, we may have a matching record that is in different partitions, which should be in the output but not. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes #21587 from cloud-fan/join. (cherry picked from commit dc8a6be) Signed-off-by: Xiao Li <[email protected]>
What changes were proposed in this pull request?
The current shuffle planning logic
Distributioninterface.Partitioninginterface.Partitioning.satisfydetermines whether aPartitioningcan satisfy aDistribution.Partitioning.compatibleWith.Partitioning.guarantees.This design has a major problem with the definition of "compatible".
Partitioning.compatibleWithis not well defined, ideally aPartitioningcan't know if it's compatible with otherPartitioning, without more information from the operator. For example,t1 join t2 on t1.a = t2.b,HashPartitioning(a, 10)should be compatible withHashPartitioning(b, 10)under this case, but the partitioning itself doesn't know it.As a result, currently
Partitioning.compatibleWithalways return false except for literals, which make it almost useless. This also means, if an operator has distribution requirements for multiple children, Spark always add shuffle nodes to all the children(although some of them can be eliminated). However, there is no guarantee that the children's output partitionings are compatible with each other after adding these shuffles, we just assume that the operator will only specifyClusteredDistributionfor multiple children.I think it's very hard to guarantee children co-partition for all kinds of operators, and we can not even give a clear definition about co-partition between distributions like
ClusteredDistribution(a,b)andClusteredDistribution(c).I think we should drop the "compatible" concept in the distribution model, and let the operator achieve the co-partition requirement by special distribution requirements.
Proposed shuffle planning logic after this PR
(The first 4 are same as before)
Distributioninterface.Partitioninginterface.Partitioning.satisfydetermines whether aPartitioningcan satisfy aDistribution.The new distribution model is very simple, we only have one kind of relationship, which is
Partitioning.satisfy. For multiple children, Spark only guarantees they have the same number of partitions, and it's the operator's responsibility to leverage this guarantee to achieve more complicated requirements. For example, non-broadcast joins can use the newly addedHashPartitionedDistributionto achieve co-partition.How was this patch tested?
existing tests.