Skip to content

Conversation

@ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Nov 8, 2022

What changes were proposed in this pull request?

Try our best to give a stable output partitioning and ordering if current executed plan is final plan in InMemoryTableScanExec.
Make AdaptiveSparkPlanExec expose the isFinal flag

Why are the changes needed?

The cached plan in InMemoryRelation can be AdaptiveSparkPlanExec, however AdaptiveSparkPlanExec deos not specify its output partitioning and ordering. It causes unnecessary shuffle and local sort for downstream action.

          ...
           |
  AdaptiveSparkPlanExec
           |
  InMemoryTableScanExec
           |
          ...

after this pr, the InMemoryTableScanExec can preverse the output partitioning and ordering.

Does this PR introduce any user-facing change?

no, only improve performance

How was this patch tested?

add test

@github-actions github-actions bot added the SQL label Nov 8, 2022
@ulysses-you
Copy link
Contributor Author

cc @cloud-fan @maryannxue @viirya thank you


override def output: Seq[Attribute] = inputPlan.output

// Try our best to give a stable output partitioning and ordering.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand this "best effort". AFAIK, table cache is lazy. For a query that accesses a cached query the first time, the cached query is not executed yet so we don't know the output partitioning/ordering and can't optimize out shuffles. But when the cached query is accessed the next time, it's already executed and we know the output partitioning/ordering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in general the first action for a cached plan is count, e.g. CacheTableAsSelectExec, so I think it is a not big issue that we can not optimize the shuffle/sort for the first action.

The usage of the cache is: user wants to reference it multi-times, then this optimization will help a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be super limited use... and cause inconsistency.

I'd only return output partitioning if there is a user repartition op in the end. In other words, only if AQE plan is required to preserve user specified partitioning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we hint this.. per my experience, user always caches an arbitrary df and use the cached df to build an another arbitrary df. So why can't we preserve the partitioning/ordering of the cached plan ? If you really feel inconsistency in AdaptiveSparkPlanExec, we can probably move to InMemoryRelationExec.

My original idea is to do the both but feel a little overkill (requiredOrdering should be inferred separately like #35924)

requiredDistribution.map(_.createPartitioning(conf.shufflePartitions)).getOrElse {
  if (isFinalPlan) {
    executedPlan.outputPartitioning
  } else {
    super.outputPartitioning
  }
}

A useful distribution before caching is few in production since repartition(col) will introduce skew


test("SPARK-41048: Improve output partitioning and ordering with AQE cache") {
withSQLConf(
SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after this PR, we can probably turn this on by default, to improve AQE coverage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, We can also remvoe the internal tag

Comment on lines 116 to 117
case adaptive: AdaptiveSparkPlanExec if adaptive.isFinalized => adaptive.executedPlan
case other => other
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this only affects output partitioning and ordering after the cached relation is materialized? And if the query plan refers this cached plan is already finished with planning but not executed yet, it will still use old partitioning and ordering, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for good side, this can improve AQE coverage for some limited cases. Although I'm also worrying about some inconsistent behaviors that could be seen by end-users regarding shuffling/sorting. They might ask for questions why sometimes shuffle/sort is added but sometimes it isn't.

I guess that might require users more tricks on optimizing cached AQE relation in practice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea is to materialize the AQE plan eagerly so that even the first cache access can be optimized. However, this requires triggering query execution during query planning, which is a bit risky.

A good practice is to ask users to do query caching eagerly, e.g. do a df.count right after the df is cached. Then they won't observe inconsistencies. Anyway, I think this PR is a net win as it optimizes all the following cache accesses after the first access. This is important for query caching as cache is meant to be accessed repeatedly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this requires some good practice when caching query and using it. Otherwise, this looks a good one.


def executedPlan: SparkPlan = currentPhysicalPlan

def isFinalized: Boolean = isFinalPlan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do private var _isFinalPlan and def isFinalPlan instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, addressed

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in d218013 Nov 17, 2022
@ulysses-you ulysses-you deleted the aqe-cache branch November 17, 2022 05:34
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…cache

### What changes were proposed in this pull request?

Try our best to give a stable output partitioning and ordering if current executed plan is final plan in `InMemoryTableScanExec`.
Make AdaptiveSparkPlanExec expose the isFinal flag

### Why are the changes needed?

The cached plan in InMemoryRelation can be AdaptiveSparkPlanExec, however AdaptiveSparkPlanExec deos not specify its output partitioning and ordering. It causes unnecessary shuffle and local sort for downstream action.

```
          ...
           |
  AdaptiveSparkPlanExec
           |
  InMemoryTableScanExec
           |
          ...
```
after this pr, the `InMemoryTableScanExec` can preverse the output partitioning and ordering.

### Does this PR introduce _any_ user-facing change?

no, only improve performance

### How was this patch tested?

add test

Closes apache#38558 from ulysses-you/aqe-cache.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 15, 2022
…cache

### What changes were proposed in this pull request?

Try our best to give a stable output partitioning and ordering if current executed plan is final plan in `InMemoryTableScanExec`.
Make AdaptiveSparkPlanExec expose the isFinal flag

### Why are the changes needed?

The cached plan in InMemoryRelation can be AdaptiveSparkPlanExec, however AdaptiveSparkPlanExec deos not specify its output partitioning and ordering. It causes unnecessary shuffle and local sort for downstream action.

```
          ...
           |
  AdaptiveSparkPlanExec
           |
  InMemoryTableScanExec
           |
          ...
```
after this pr, the `InMemoryTableScanExec` can preverse the output partitioning and ordering.

### Does this PR introduce _any_ user-facing change?

no, only improve performance

### How was this patch tested?

add test

Closes apache#38558 from ulysses-you/aqe-cache.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…cache

### What changes were proposed in this pull request?

Try our best to give a stable output partitioning and ordering if current executed plan is final plan in `InMemoryTableScanExec`.
Make AdaptiveSparkPlanExec expose the isFinal flag

### Why are the changes needed?

The cached plan in InMemoryRelation can be AdaptiveSparkPlanExec, however AdaptiveSparkPlanExec deos not specify its output partitioning and ordering. It causes unnecessary shuffle and local sort for downstream action.

```
          ...
           |
  AdaptiveSparkPlanExec
           |
  InMemoryTableScanExec
           |
          ...
```
after this pr, the `InMemoryTableScanExec` can preverse the output partitioning and ordering.

### Does this PR introduce _any_ user-facing change?

no, only improve performance

### How was this patch tested?

add test

Closes apache#38558 from ulysses-you/aqe-cache.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants