Skip to content

Conversation

@peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Sep 28, 2022

What changes were proposed in this pull request?

This PR introduce TreeNode.multiTransform() methods to be able to recursively transform a TreeNode (and so a tree) into multiple alternatives. These functions are particularly useful if we want to transform an expression with a projection in which subexpressions can be aliased with multiple different attributes.

E.g. if we have a partitioning expression HashPartitioning(a + b) and we have a Project node that aliases a as a1 and a2 and b as b1 and b2 we can easily generate a stream of alternative transformations of the original partitioning:

// This is a simplified test, some arguments are missing to make it conciese
val partitioning = HashPartitioning(Add(a, b))
val aliases: Map[Expression, Seq[Attribute]] = ... // collect the alias map from project
val s = partitioning.multiTransform {
  case e: Expression if aliases.contains(e.canonicalized) => aliases(e.canonicalized)
}
s // Stream(HashPartitioning(Add(a1, b1)), HashPartitioning(Add(a1, b2)), HashPartitioning(Add(a2, b2)), HashPartitioning(Add(a2, b2)))

The result of multiTransform is a lazy stream to be able to limit the number of alternatives generated at the caller side as needed.

Why are the changes needed?

TreeNode.multiTransform() is a useful helper method.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UTs are added.

@peter-toth
Copy link
Contributor Author

peter-toth commented Sep 28, 2022

I've opened 3 WIP PRs to demonstrate the usage of multiTransform():

Please note that all 3 are based on this PR so currently they contain the changes of this PR too.

@peter-toth
Copy link
Contributor Author

@cloud-fan, @gengliangwang, @sigmod could you please take a look at this PR?

@peter-toth peter-toth force-pushed the SPARK-40599-multitransform branch from 6bd10b6 to cc73d00 Compare December 27, 2022 14:27
@peter-toth
Copy link
Contributor Author

@cloud-fan, @sigmod I still think multiTransform() can be useful helper. Please find a few examples a previous comment. I've rebased the PRs on the latest master once more. Let me know if you are intterested. If not, I will drop these PRs.

@sigmod
Copy link
Contributor

sigmod commented Dec 28, 2022

@peter-toth:

  • do you have more concrete examples in mind for multiTransform?
  • It seems that we can easily run into exponential alternatives?
  • I assume you eventually will only choose one from those alternatives?

@peter-toth
Copy link
Contributor Author

peter-toth commented Dec 29, 2022

Hi @sigmod,

do you have more concrete examples in mind for multiTransform?

I gave 3 concrete examples where multiTranform could help. All of these PRs contain 2 commits. The first commits are all equal to this PR, the seconds contain the suggested changes utilizing multiTranform.

It seems that we can easily run into exponential alternatives?

That's correct but that's why multiTransform returns a lazy Stream. To let the caller decide how many results they need and don't calculate the alternatives in advance.
In the fist example the number of elements in the partitioning collection is limited by a new spark.sql.aliasAwareOutputPartitioning.expandLimit config. The 2nd example uses the already introduded spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit config. The 3rd example doesn't limit the number of constraints as the original code doesn't do that. (But we could easily introduce a finer grained limit instead of just beeing able to enable/disable spark.sql.constraintPropagation.enabled.)

I assume you eventually will only choose one from those alternatives?

No, the main pont in the above examples is that we need a set of results.

But, this new method also gives the oportunitly to limit the number of alternatives returned by time. E.g. we could spend a certain max number of seconds generating and evaulating alternative trees to find the best one...

@peter-toth
Copy link
Contributor Author

peter-toth commented Jan 5, 2023

@sigmod let me know if you need more examples or I should ellaboreate on the aboves.
Also cc @cloud-fan, please share your thoughts.

@cloud-fan
Copy link
Contributor

I'm a bit hesitant to touch the core TreeNode library as these things can be hard to generalize. For example, for output ordering a + b, if the project list is a, a as x, b as y1, b as y2, we should produce a + y1, a + y2, x + y2, x + y2. Note that:

  1. sometimes we only want to replace one alias as the original attribute a is still in the output
  2. We might do some early pruning to avoid exponential growth
  3. We can strip some expression from the project list, e.g. empty2null(a) -> a

Are you sure the multiTransform can meet all these requirements? Can you give an example about how to use it to implement the algorithm in #39556 (comment) ?

@peter-toth
Copy link
Contributor Author

peter-toth commented Jan 13, 2023

  1. sometimes we only want to replace one alias as the original attribute a is still in the output

multiTransform accepts a rule with type PartialFunction[BaseType, Seq[BaseType]] so the transformation can contain the original item itself . E.g. a -> Seq(a, x) in your example. This is how the aliasMap is built in https://github.com/apache/spark/pull/37525/files#diff-a2a190aab57ed84a61da9c3a1b57a79a65b0f31537d81450ad4ba2c199aad2c1R31-R52.

  1. We might do some early pruning to avoid exponential growth

I think by early pruning you meant that some of the mapped expressions should comply some constraint. E.g. the mapped expressions should be unique in terms of their canonicalized form. HashPartitioning(a + b) and HashPartitioning(b + a) are the same so there should be only one of them in the output set of mapped expressions. This can be achieved by filtering the output of multiTransform() like I do in https://github.com/apache/spark/pull/37525/files#diff-a2a190aab57ed84a61da9c3a1b57a79a65b0f31537d81450ad4ba2c199aad2c1R98-R101 (expressionPartitionings.add(e.canonicalized)).
I think the other filter e.references.subsetOf(outputSet) shouldn't actually be there as

  • we could remove those expressions from child.outputPartitioning that contain attributes missing from outputSet and non-transformable (no alias defined) before running the multiTransform
  • we shouldn't give a rule to multiTransform that transforms an expression to an attribute that is not in outputSet

but that PR is WIP. It's main purpose would be to show multiTransform usage.

  1. We can strip some expression from the project list, e.g. empty2null(a) -> a

I'm not sure why don't we do that change in a simple transform() before generating the results. But you can also do that with empty2null(a) -> Seq(a) using multiTransform().

@peter-toth
Copy link
Contributor Author

@cloud-fan I've updated my previous comment with the right example for 2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good example, and explains why we don't have multiTransformUp. Can we highlight this and remove multiTransform as callers should be aware that it's tramsform down?

Copy link
Contributor Author

@peter-toth peter-toth Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is possible to add a multiTransformUp() version, although this PR doesn't contain it and I'm not sure when it would make sense to use it instead of the current top-down version.
I've removed the general multiTransform and kept the expilicit multiTransformDown versions.
I've also added a few more details to the examples. Let me know it we need more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not very clear to me that why we need this bool flag.

Copy link
Contributor Author

@peter-toth peter-toth Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag is an internal flag and multiTransform and multiTransformWithPruning doesn't even return it. Probably multiTransformDownWithPruning should hide it as well using a private helper.

It is used internally after a non-leaf node is transformed to some alternatives but those alternatives doesn't contain the original node itself. E.g. please see a bit above that Add(a, b) -> Seq(11, 12, 21, 22), but the Seq doesn't contain Add(a, b) itself.
Now in this case we still need to consider the original Add(a, b) and traverse down and consider alternatives that might transform a or b.
This is done by adding Add(a, b) with a childrenTransformRequired=true flag to the afterRulesStream stream.
The returned flag you are asking about (we can call it childrenTransformed), is kind of the pair ot the previous flag.
Once the transformation of children are done that flag is "propagated up" and based on both flags we can decide if we need to add the Add(a, b) (with its children transformed) to the valid alternatives (!childrenTransformRequired || childrenTransformed). We do this in this collect:
https://github.com/apache/spark/blob/cc73d00f3e608076feebc65737e53e6454845bd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L762-L768

Please note that there is another flag transformed in the afterRulesStream. If is pretty similar to negated childrenTransformRequired but they are not always negate each other to handle nodes where the rule didn't apply.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, I've removed the autoContinue feature so these flags have been removed from the latest version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, I realized that this semantics, that the rule returns Seq.emty for a node, could be used for early pruning.
Currently it means no transformation as if the rule didn't apply or returned a one element Seq with the original node.
But returned Seq.empty could mean early pruning to to acomplish something like pruneFunc does it @ulysses-you's latest version: #39556
Let me add that functionality here next week.

@peter-toth peter-toth force-pushed the SPARK-40599-multitransform branch from cc73d00 to b99f2f9 Compare January 15, 2023 09:17
… explicit flag to enable the `autoContinue` feature, add more examples, remove general versions to highlight this is a top-down rule
@peter-toth peter-toth force-pushed the SPARK-40599-multitransform branch from 690e0c5 to 8de8f88 Compare January 16, 2023 15:00
/**
* Returns alternative copies of this node where `rule` has been recursively applied to the tree.
*
* Users should not expect a specific directionality. If a specific directionality is needed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment needs an update. We should also explain why only the down direction is provided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've modified the scala docs, let me know if we need more defails or fixes.

* Returns alternative copies of this node where `rule` has been recursively applied to the tree.
*
* Users should not expect a specific directionality. If a specific directionality is needed,
* multiTransformDownWithPruning or multiTransformUpWithPruning should be used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* Users should not expect a specific directionality. If a specific directionality is needed,
* multiTransformDownWithPruning or multiTransformUpWithPruning should be used.
*
* @param rule a function used to generate transformed alternatives for a node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param rule a function used to generate transformed alternatives for a node
* @param rule a function used to generate alternatives for a node

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* of its children (pre-order).
*
* As it is very easy to generate enormous number of alternatives when the input tree is huge or
* when the rule returns large number of alternatives, this function returns the alternatives as a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* when the rule returns large number of alternatives, this function returns the alternatives as a
* when the rule returns many alternatives for many nodes, this function returns the alternatives as a

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* valid alternative.
*
* The rule can return `Stream.empty` to indicate that the original node should be pruned. In this
* case `multiTransform` returns an empty `Stream`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also mention, if the rule applies but you want the not-apply behavior, you can just return Seq(originalNode)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*
* 3.
* It is not always easy to determine if we will do any child expression mapping but we can enable
* the `autoContinue` flag to get the same result:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about making multiTransform too complicated. Given the only use case for now is projecting expressions such as output partitions/orderings, can we simplify the rule a little bit? My preference is to remove this autoContinue flag and fully rely on the callers.

It is not always easy to determine if we will do any child expression mapping

If we have a real SQL use case, I'm happy to change my mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed. And I don't have any other use case either, so I will remove that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, removed.

@cloud-fan
Copy link
Contributor

I think this is more efficient than keeping a candidate plans list and transforming all the plans in the list and repeating, as I mentioned in #39556 (comment) . I'll take a closer look after the autoContinue flag is removed. Great idea and great job!

Comment on lines 818 to 820
}.map { rewritten_plan =>
if (this eq rewritten_plan) {
markRuleAsIneffective(ruleId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if my understanding is correct, the only way to mark this rule as ineffective is that it returns stream with one alternative which is eq to this ? then why we use map here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thanks, this looks wrong. I will fix it with other requests today.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest commit fixes this, although there is relevant issue that I'm not sure we can mark the rule ineffective if the one element original stream is returned:
https://github.com/apache/spark/pull/38034/files#diff-94575875fbf007fdaf43e4946c69c18649294fed974a46816ab1986f6350541bR691-R699

Copy link
Contributor

@cloud-fan cloud-fan Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK. We can mark the rule as ineffective only if the partial function does not apply. Once it applies, no matter what it returns, it's effective.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems fine

…ark a rule ineffective, move generateChildrenSeq as a top level helper as it will help with early pruning
}

private def generateChildrenSeq[T](childrenStreams: Seq[Stream[T]]): Stream[Seq[T]] = {
childrenStreams.foldRight(Stream(Seq.empty[T]))((childrenStream, childrenSeqStream) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we fold from right to left?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to generate alternatives for the first children of an expression first.
E.g. if a + b is the input and a => Stream(a1, a2) and b => Stream(b1, b2) is the rule then I wanted to get the Stream(a1 + b1, a2 + b1, a1 + b2, a2 + b2) output in this order.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in d8d604b Jan 17, 2023
@peter-toth
Copy link
Contributor Author

Thanks for the review @cloud-fan, @ulysses-you!

cloud-fan pushed a commit that referenced this pull request Jan 19, 2023
…es to be any kinds of Seq

### What changes were proposed in this pull request?
This is a follow-up PR to #38034. It relaxes `multiTransformDown()`'s `rule` parameter type to accept any kinds of `Seq` and make `MultiTransform.generateCartesianProduct()` helper public.

### Why are the changes needed?
API mprovement.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UTs.

Closes #39652 from peter-toth/SPARK-40599-multitransform-follow-up.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants