Skip to content

Conversation

@IgorBerman
Copy link

Backporting partially from https://issues.apache.org/jira/browse/SPARK-51938

1 SPARK-41471
SPJ: Reduce Spark shuffle when only one side of a join is KeyGroupedPartitioning
Aug 24, 2023
Merged to master
#42194
cherry-picked

2 SPARK-45036
SPJ: Refactor logic to handle partially clustered distribution
Sep 5, 2023
#42757
Sep 7, 2023
cherry-picked

#42839
cherry-picked

3 SPARK-44647
Sep 11, 2023
SPJ: Support SPJ when join key is subset of partition keys
#42306
cherry-picked

6 SPARK-48065
SPJ: allowJoinKeysSubsetOfPartitionKeys is too strict
May 3, 2024
#46325
cherry-picked

following PRs/tickets weren't backported
4. SPARK-42040
SPJ: Introduce a new API for V2 input partition to report partition size
#45314
Mar 27, 2024

5 SPARK-47094
SPJ : Dynamically rebalance number of buckets when they are not equal
Apr 6, 2024
#45267

7 SPARK-48012
SPJ: Support Transfrom Expressions for One Side Shuffle Sub-task
Jun 9, 2024
#46255

  1. SPARK-47094
    SPJ : Dynamically rebalance number of buckets when they are not equal
    Oct 30, 2024
    [SPARK-47094][SQL][TEST][FOLLOWUP] SPJ : fix bucket reducer function #47126

Hisoka-X and others added 5 commits June 18, 2025 18:00
…is KeyGroupedPartitioning

### What changes were proposed in this pull request?
When only one side of a SPJ (Storage-Partitioned Join) is KeyGroupedPartitioning, Spark currently needs to shuffle both sides using HashPartitioning. However, we may just need to shuffle the other side according to the partition transforms defined in KeyGroupedPartitioning. This is especially useful when the other side is relatively small.
1. Add new config `spark.sql.sources.v2.bucketing.shuffle.enabled` to control this feature enable or not.
2. Add `KeyGroupedPartitioner` use to partition when we know the tranform value of another side (KeyGroupedPartitioning at now). Spark already know the partition value with partition id of KeyGroupedPartitioning side in `EnsureRequirements`. Then save it in `KeyGroupedPartitioner` use to shuffle another partition, to make sure the same key data will shuffle into same partition.
3. only `identity` transform will work now. Because have another problem for now, same transform between DS V2 connector implement and catalog function will report different value, before solve this problem, we should only support `identity`. eg: in test package, `YearFunction` https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala#L47 and https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala#L143

### Why are the changes needed?
Reduce data shuffle in specific SPJ scenarios

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add new test

Closes apache#42194 from Hisoka-X/SPARK-41471_one_side_keygroup.

Authored-by: Jia Fan <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
(cherry picked from commit ce12f6d)
…red distribution

In SPJ, currently the logic to handle partially clustered distribution is a bit complicated. For instance, when the feature is eanbled (by enabling both `conf.v2BucketingPushPartValuesEnabled` and `conf.v2BucketingPartiallyClusteredDistributionEnabled`), Spark should postpone the combining of input splits until it is about to create an input RDD in `BatchScanExec`. To implement this, `groupPartitions` in `DataSourceV2ScanExecBase` currently takes the flag as input and has two different behaviors, which could be confusing.

This PR introduces a new field in `KeyGroupedPartitioning`, named `originalPartitionValues`, that is used to store the original partition values from input before splits combining  has been applied. The field is used when partially clustered distribution is enabled. With this, `groupPartitions` becomes easier to understand.

In addition, this also simplifies `BatchScanExec.inputRDD` by combining two branches where partially clustered distribution is not enabled.

To simplify the current logic in the SPJ w.r.t partially clustered distribution.

No

Existing tests.

Closes apache#42757 from sunchao/SPARK-45036.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 9e2aafb)
…ted according to partition values

### What changes were proposed in this pull request?

This PR makes sure the result grouped partitions from `DataSourceV2ScanExec#groupPartitions` are sorted according to the partition values. Previously in the apache#42757 we were assuming Scala would preserve the input ordering but apparently that's not the case.

### Why are the changes needed?

See apache#42757 (comment) for diagnosis. The partition ordering is a fundamental property for SPJ and thus must be guaranteed.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

We have tests in `KeyGroupedPartitioningSuite` to cover this.

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#42839 from sunchao/SPARK-45036-followup.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
(cherry picked from commit af1615d)
…keys

### What changes were proposed in this pull request?
- Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
- Change key compatibility checks in EnsureRequirements.  Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled)
- Change BatchScanExec/DataSourceV2Relation to group splits by join keys if they differ from partition keys (previously grouped only by partition values).  Do same for all auxiliary data structure, like commonPartValues.
- Implement partiallyClustered skew-handling.
  - Group only the replicate side (now by join key as well), replicate by the total size of other-side partitions that share the join key.
  - add an additional sort for partitions based on join key, as when we group the replicate side, partition ordering becomes out of order from the non-replicate side.

### Why are the changes needed?
- Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
-Added tests in KeyGroupedPartitioningSuite
-Found two existing problems, will address in separate PR:
- Because of apache#37886   we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered.  Need to see how to relax this.
- https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change.  This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist.  Hopefully this will also get fixed in another way.

Closes apache#42306 from szehon-ho/spj_attempt_master.

Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

(cherry picked from commit 9520087)
### What changes were proposed in this pull request?
If spark.sql.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled is true, change KeyGroupedPartitioning.satisfies0(distribution) check from all clustering keys (here, join keys)  being in partition keys, to the two sets overlapping.

  ### Why are the changes needed?
If spark.sql.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled is true, then SPJ no longer triggers if there are more join keys than partition keys. But SPJ is supported in this case if flag is false.

  ### Does this PR introduce _any_ user-facing change?
No

  ### How was this patch tested?
Added tests in KeyGroupedPartitioningSuite

 ### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46325 from szehon-ho/fix_spj_less_join_key.

Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
(cherry picked from commit 5ec62a7)
@IgorBerman
Copy link
Author

I know about the policy not to backport improvements, but I think this PR might help many of those that want to see better SPJ and can't upgrade to 4.x yet

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jun 18, 2025

This is a shared community repository. Please use your repository for that kind of sharing purpose, @IgorBerman .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants