Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Oct 2, 2022

What changes were proposed in this pull request?

Similar to PullOutGroupingExpressions. This PR adds a new rule(PullOutComplexJoinCondition) to pull out complex join condition.

Why are the changes needed?

Pull out complex join condition has following advantage:

  1. Reduce the number of complex expression evaluations from 3 to 2 times if it is SortMergeJoin..
  2. Infer more additional filters, sometimes can avoid data skew. For example: [SPARK-31809][SQL] Infer IsNotNull from some special equality join keys #28642
  3. Avoid other rules also need to handle complex condition. For example: https://github.com/apache/spark/blob/dee7396204e2f6e7346e220867953fc74cd4253d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushPartialAggregationThroughJoin.scala#L325-L327

For example:

CREATE TABLE t1 (item_id BIGINT, event_type STRING, dt STRING) USING parquet PARTITIONED BY (dt);
CREATE TABLE t2 (item_id BIGINT, cal_dt DATE) using parquet;
set spark.sql.autoBroadcastJoinThreshold=-1;

SELECT a.item_id,
       a.event_type
FROM   t1 a
       INNER JOIN (SELECT DISTINCT cal_dt,
                                   item_id
                   FROM   t2) b
               ON a.item_id = b.item_id
                  AND To_date(a.dt, 'yyyyMMdd') = b.cal_dt
WHERE To_date(a.dt, 'yyyyMMdd') = date '2022-10-01';

Before this PR:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [item_id#28L, event_type#29]
   +- SortMergeJoin [cast(gettimestamp(dt#30, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date), item_id#28L], [cal_dt#32, item_id#31L], Inner
      :- Sort [cast(gettimestamp(dt#30, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date) ASC NULLS FIRST, item_id#28L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(cast(gettimestamp(dt#30, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date), item_id#28L, 5), ENSURE_REQUIREMENTS, [plan_id=78]
      :     +- Filter isnotnull(item_id#28L)
      :        +- FileScan parquet spark_catalog.default.t1[item_id#28L,event_type#29,dt#30]
      +- Sort [cal_dt#32 ASC NULLS FIRST, item_id#31L ASC NULLS FIRST], false, 0
         +- HashAggregate(keys=[cal_dt#32, item_id#31L], functions=[])
            +- Exchange hashpartitioning(cal_dt#32, item_id#31L, 5), ENSURE_REQUIREMENTS, [plan_id=74]
               +- HashAggregate(keys=[cal_dt#32, item_id#31L], functions=[])
                  +- Filter (isnotnull(item_id#31L) AND isnotnull(cal_dt#32))
                     +- FileScan parquet spark_catalog.default.t2[item_id#31L,cal_dt#32]

After this PR:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [item_id#28L, event_type#29]
   +- SortMergeJoin [CAST(gettimestamp(a.#37, item_id#28L], [cal_dt#32, item_id#31L], Inner
      :- Sort [CAST(gettimestamp(a.#37 ASC NULLS FIRST, item_id#28L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(CAST(gettimestamp(a.#37, item_id#28L, 5), ENSURE_REQUIREMENTS, [plan_id=78]
      :     +- Project [item_id#28L, event_type#29, cast(gettimestamp(dt#30, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date) AS CAST(gettimestamp(a.#37]
      :        +- Filter isnotnull(item_id#28L)
      :           +- FileScan parquet spark_catalog.default.t1[item_id#28L,event_type#29,dt#30]
      +- Sort [cal_dt#32 ASC NULLS FIRST, item_id#31L ASC NULLS FIRST], false, 0
         +- HashAggregate(keys=[cal_dt#32, item_id#31L], functions=[])
            +- Exchange hashpartitioning(cal_dt#32, item_id#31L, 5), ENSURE_REQUIREMENTS, [plan_id=74]
               +- HashAggregate(keys=[cal_dt#32, item_id#31L], functions=[])
                  +- Filter (((cal_dt#32 = 2022-10-01) AND isnotnull(item_id#31L)) AND isnotnull(cal_dt#32))
                     +- FileScan parquet spark_catalog.default.t2[item_id#31L,cal_dt#32]

Performance evaluation:

Spark Job Duration(min)
Vanilla 38
Pull out complex join condition and disable spark.sql.constraintPropagation.enabled 3.6
Pull out complex join condition and enable spark.sql.constraintPropagation.enabled 1.8

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

@github-actions github-actions bot added the SQL label Oct 2, 2022
@wangyum
Copy link
Member Author

wangyum commented Nov 4, 2022

Case from production:

Before After
image image

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Feb 13, 2023
@github-actions github-actions bot closed this Feb 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant