Skip to content

Conversation

@ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Jan 13, 2023

What changes were proposed in this pull request?

This pr moves AliasAwareOutputExpression from core to catalyst so both logical plan and physical plan can use it.

Improve the code of replace alias to support multi-alias so we can preverse ordering with all of aliased, for example:

SELECT c, c as x, c as y FROM (SELECT * FROM t ORDER BY c)

Improve the AliasAwareQueryOutputOrdering to support strip expression which does not affect result. For example Empty2Null.

Why are the changes needed?

AliasAwareOutputExpression now does not support if an attribute has more than one alias, and AliasAwareOutputExpression should also work for LogicalPlan.

Does this PR introduce any user-facing change?

improve performance and this also fix the issue in pr #39475

How was this patch tested?

add test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need anything from the QueryPlan? Can it simply be trait AliasAwareOutputExpression extends SQLConfHelper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need override outputOrdering and outputPartitioning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AliasAwareOutputExpression itself can be simplified

Comment on lines 44 to 45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val aliasArray = attrWithAliasMap.getOrElseUpdate(strip(key).canonicalized,
new ArrayBuffer[Attribute]())
val aliasArray = attrWithAliasMap.getOrElseUpdate(
strip(key).canonicalized, new ArrayBuffer[Attribute]())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we return a empty map immediately if aliasCandidateLimit < 1? I think it's better than checking it at https://github.com/apache/spark/pull/39556/files#diff-2d06454bd3d4226cab8749376af5298599e0d5a1de175d9ba462608390d7d593R64

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only append the new expr if all its references are contained by outputExpressions.map(_.toAttribute)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newExpr can contain other reference. for example df.orderby($"a" + $"b").selectExpr("a as x"), we only replace a to x but the expression Add has an another attribute b.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, we should not report the ordering as x + b, as b is not even outputted by the plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a little hard do this in AliasAwareOutputExpression. for example PartitioningCollection(a, b) and a alias to x. If we want to return PartitioningCollection(a) only, then we need to prune b. It should be handled at AliasAwareOutputPartitioning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't return PartitioningCollection(a) only. If a relation's child is partitioned by a and b, but b is not outputted by this relation (no alias either), then it's wrong to say this relation is partitioned by a. It can only be UnknownPartitioning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitioningCollection(a, b) means t1 join t2 on a = b, not group by a, b..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, then it's a flatMap semantic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means we don't do any alias replacement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it just specifies it's outputExpressions to AliasAwareOutputExpression so that AliasAwareOutputExpression can build alias map

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but child.output has no alias at all, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Project has overridden it..

override protected def outputExpressions: Seq[NamedExpression] = projectList

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which subclass uses the default implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter/Limit etc..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
buildConf("spark.sql.optimizer.outputPartitioningAndOrderingCandidateLimit")
buildConf("spark.sql.optimizer.expressionProjectionCandidateLimit")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may apply it to more places like constraint, let's be general.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is correct. It only replace one alias from the input expressions. What happens if the output ordering is a + b and the alias is a as x, b as y? will we return x + y?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the algorithm should be

candidates = e
for ((expr, aliases) <- aliasMap) {
  val newCandicates = candidates.transform ...
  candidates ++= newCandicates
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also add some early pruning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I'd say no for now. Idealy, we should return a + y, x + b, x + y. The current code can not return x + y. But it seems a corner case.

It assumes the input is: a, a as x, a as y which is more likely happen..

Copy link
Contributor

@peter-toth peter-toth Jan 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ulysses-you, @cloud-fan please take a look at my #37525 that is based on a new helper TreeNode.multiTransform() that I would like to add in #38034.
IMO TreeNode.multiTransform() would be a useful helper function to solve issues like this one and some others: #38034 (comment)

@cloud-fan
Copy link
Contributor

One principle we should hold: a plan's output partitioning/ordering must only contain the attributes from its output, otherwise the semantic is hard to define. What do you mean your data is partitioned by a non-existing column?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with this loop on aliasMap elements one by one and always adding new elements to normalizedCandidates and then do some filtering after the aliasMap loop you might do the same issue as described 3rd in #38034 (comment) (constraint generation)

@ulysses-you
Copy link
Contributor Author

@cloud-fan addreesed all comments, thank you @peter-toth

Comment on lines 1365 to 1376
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is for the comment #39556 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass a prune function to handle PartitionCollection and sameOrderExpression.

@ulysses-you ulysses-you changed the title [WIP][SPARK-42049][SQL] Improve AliasAwareOutputExpression [SPARK-42049][SQL] Improve AliasAwareOutputExpression Jan 14, 2023
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to also handle such as RangePartitioning

Comment on lines +440 to +442
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.doc("The maximum number of the candidate of out put expressions whose alias are replaced." +
" It can preserve the output partitioning and ordering." +
" Negative value means disable this optimization.")
.doc("The maximum number of candidates for output expressions whose aliases are replaced." +
" This can preserve the output partitioning and ordering." +
" A negative value means to disable this optimization.")

Copy link
Contributor

@EnricoMi EnricoMi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes #38356 / SPARK-40885.

The example now writes this plan, as expected:

WriteFiles
+- *(1) Project [id#10, sort_col#11, empty2null(p#12) AS p#19]
   +- *(1) Sort [p#12 ASC NULLS FIRST, sort_col#11 ASC NULLS FIRST], false, 0
      +- ShuffleQueryStage 0
         +- Exchange SinglePartition, REPARTITION_BY_NUM, [plan_id=18]
            +- LocalTableScan [id#10, sort_col#11, p#12]

@ulysses-you ulysses-you closed this Feb 1, 2023
@ulysses-you ulysses-you deleted the SPARK-42049 branch February 1, 2023 05:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants