Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This is a followup of #44310 . It turns out that TreeNodeTag in Project is way too fragile. Project is a very basic node and very easy to get removed/transformed during plan optimization.

This PR switches to a different approach: since we can't retain the information (input data order doesn't matter) from Aggregate, let's leverage this information immediately. We pull out the expensive part of EliminateSorts to a new rule, so that we can safely call EliminateSorts right before we turn Aggregate into Project.

Why are the changes needed?

to make the optimizer more robust.

Does this PR introduce any user-facing change?

no

How was this patch tested?

existing tests

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions github-actions bot added the SQL label Dec 20, 2023
} else {
s.copy(order = newOrders)
}
case Sort(orders, false, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the expensive part as it need to calculate the ordering of children.

}, plan)
val sort = plan.collectFirst { case s: SortExec => s }
if (enabled) {
// With planned write, optimizer is more efficient and can eliminate `SORT BY value, key`.
Copy link
Contributor Author

@cloud-fan cloud-fan Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good side effect of this change. Before this PR, there is a conflict in EliminateSorts: ideally we remove the bottom Sort and keep the top Sort, but if the child sort ordering satisfies the top Sort, we remove top Sort. This is inconsistent and also suboptimal as we sort by more keys.

Now we have fixed the conflict. We always remove bottom Sort first.

@cloud-fan
Copy link
Contributor Author

* RepartitionByExpression, RebalancePartitions (with deterministic expressions) operators only
* and the Join condition is deterministic
* 5) if the Sort operator is within GroupBy separated by 0...n Project, Filter, Repartition or
* 4) if the Sort operator is within GroupBy separated by 0...n Project, Filter, Repartition or
Copy link
Contributor Author

@cloud-fan cloud-fan Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is still in EliminateSorts, so EliminateSorts is good enough for LimitPushDown

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the remaining failures.

[info] - SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column *** FAILED *** (188 milliseconds)

}

private def recursiveRemoveSort(plan: LogicalPlan, optimizeGlobalSort: Boolean): LogicalPlan = {
if (!plan.containsPattern(SORT)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we pull out this to apply method ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should put it here to skip some children of a plan node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plan.containsPattern contains the bitset of children..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we traverse down a tree, we still need to apply the skipping for each plan node that has more than one children.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, make sense. Here we traverse the tree manually

// so here we only preserve its output partitioning using `RepartitionByExpression`.
// We should use `None` as the optNumPartitions so AQE can coalesce shuffle partitions.
// This behavior is same with original global sort.
RepartitionByExpression(sortOrder, recursiveRemoveSort(child, true), None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, previously this rule looks into this global Sort's child to remove local and global Sort recursively without condition. But in the new RemoveRedundantSorts rule:

case s @ Sort(orders, true, child) =>
  val newChild = recursiveRemoveSort(child, optimizeGlobalSort = false)

recursiveRemoveSort in RemoveRedundantSorts only removes local Sort if its child is already sorted. Do we miss this optimization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@viirya viirya Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Said there are Sorts like

- Sort (local)
  - Sort (global)
    - Sort (local)

We reach:

case s @ Sort(_, global, child) => s.copy(child = recursiveRemoveSort(child, global))

Previously we can get rid of the middle global Sort and the bottom local Sort by RepartitionByExpression(sortOrder, recursiveRemoveSort(child, true), None) and:

case Sort(_, global, child) if canRemoveGlobalSort || !global =>
  recursiveRemoveSort(child, canRemoveGlobalSort)

How does EliminateSorts still do it?
The code you point is same (not changed in this PR):

case s @ Sort(_, global, child) => s.copy(child = recursiveRemoveSort(child, global))

But in recursiveRemoveSort, as canRemoveGlobalSort is false, we don't get rid of the middle global Sort now (it will be done in RemoveRedundantSorts now).

Then the bottom local Sort under the rewritten RepartitionByExpression won't be optimized as it requires its child is sorted.

Do I miss or misread something?

Copy link
Contributor Author

@cloud-fan cloud-fan Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After running EliminateSorts, the bottom sort is removed, then we run RemoveRedundantSorts which will turn the middle sort to local sort.

These two rules are in the same batch

@viirya
Copy link
Member

viirya commented Dec 21, 2023

Test failure looks unrelated?

@cloud-fan
Copy link
Contributor Author

Yea the pyspark failure is unrelated. Thanks for the review, merging to master!

@EnricoMi
Copy link
Contributor

EnricoMi commented Dec 22, 2023

This once again breaks writing sorted partitioned files, last time broken with 3.0.0 and fixed in 3.3.2: #38358, #39431.

When the user calls

ds.repartition(partitionColumns: _*)
  .sortWithinPartitions((partitionColumns ++ sortColumns): _*)
  .write
  .partitionBy(partitionColumns: _*)
  .parquet(...)

then the in-partition sort by sortColumns is not redundant but desired.

Is that meant to be fixed in #44458?

@cloud-fan
Copy link
Contributor Author

@EnricoMi yes, will be fixed soon.

cloud-fan added a commit that referenced this pull request Dec 22, 2023
### What changes were proposed in this pull request?

In `V1Writes`, we try to avoid adding Sort if the output ordering always satisfies. However, the code is completely broken with two issues:
- we put `SortOrder` as the child of another `SortOrder` and compare, which always returns false.
- once we add a project to do `empty2null`, we change the query output attribute id and the sort order never matches.

It's not a big issue as we still have QO rules to eliminate useless sorts, but #44429 exposes this problem because the way we optimize sort is a bit different. For `V1Writes`, we should always avoid adding sort even if the number of ordering key is less, to not change the user query.

### Why are the changes needed?

fix code mistakes.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44458 from cloud-fan/sort.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@EnricoMi
Copy link
Contributor

Thanks!

pan3793 pushed a commit to pan3793/spark that referenced this pull request Oct 22, 2025
### What changes were proposed in this pull request?

In `V1Writes`, we try to avoid adding Sort if the output ordering always satisfies. However, the code is completely broken with two issues:
- we put `SortOrder` as the child of another `SortOrder` and compare, which always returns false.
- once we add a project to do `empty2null`, we change the query output attribute id and the sort order never matches.

It's not a big issue as we still have QO rules to eliminate useless sorts, but apache#44429 exposes this problem because the way we optimize sort is a bit different. For `V1Writes`, we should always avoid adding sort even if the number of ordering key is less, to not change the user query.

### Why are the changes needed?

fix code mistakes.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#44458 from cloud-fan/sort.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
peter-toth pushed a commit that referenced this pull request Oct 22, 2025
Backport #44458 to branch-3.5.

Justification: it fixes a hidden bug (until exposed by #44429) that has existed since 3.4.

### What changes were proposed in this pull request?

In `V1Writes`, we try to avoid adding Sort if the output ordering always satisfies. However, the code is completely broken with two issues:
- we put `SortOrder` as the child of another `SortOrder` and compare, which always returns false.
- once we add a project to do `empty2null`, we change the query output attribute id and the sort order never matches.

It's not a big issue as we still have QO rules to eliminate useless sorts, but #44429 exposes this problem because the way we optimize sort is a bit different. For `V1Writes`, we should always avoid adding sort even if the number of ordering key is less, to not change the user query.

### Why are the changes needed?

fix code mistakes.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #52692 from pan3793/SPARK-46485-3.5.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants