Skip to content

Conversation

@tgravescs
Copy link
Contributor

...sql pom files

@AmplabJenkins
Copy link

Merged build triggered. Build is starting -or- tests failed to complete.

@AmplabJenkins
Copy link

Merged build started. Build is starting -or- tests failed to complete.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13549/

@pwendell
Copy link
Contributor

LGTM - feel free to merge this!

@pwendell
Copy link
Contributor

Okay just went ahead and merged this.

@asfgit asfgit closed this in 3738f24 Mar 29, 2014
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
…ew ...

...sql pom files

Author: Thomas Graves <[email protected]>

Closes apache#263 from tgravescs/SPARK-1345 and squashes the following commits:

b43a2a0 [Thomas Graves] SPARK-1345 adding missing dependency on avro for hadoop 0.23 to the new sql pom files
mccheah pushed a commit to mccheah/spark that referenced this pull request Oct 12, 2017
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
arjunshroff pushed a commit to arjunshroff/spark that referenced this pull request Nov 24, 2020
wangyum added a commit that referenced this pull request Aug 19, 2021
…runing

### What changes were proposed in this pull request?

Remove `OptimizeSubqueries` from batch of `PartitionPruning` to make DPP support more cases. For example:
```sql
SELECT date_id, product_id FROM fact_sk f
JOIN (select store_id + 3 as new_store_id from dim_store where country = 'US') s
ON f.store_id = s.new_store_id
```

Before this PR:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#274]
      +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
         +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
```

After this PR:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN dynamicpruning#4007)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   :        +- SubqueryBroadcast dynamicpruning#4007, 0, [new_store_id#3997], [id=#263]
   :           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262]
   :              +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
   :                 +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   +- ReusedExchange [new_store_id#3997], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262]
```
This is because `OptimizeSubqueries` will infer more filters, so we cannot reuse broadcasts. The following is the plan if disable `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly`:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN subquery#4009)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   :        +- Subquery subquery#4009, [id=#284]
   :           +- *(2) HashAggregate(keys=[new_store_id#3997#4008], functions=[])
   :              +- Exchange hashpartitioning(new_store_id#3997#4008, 5), ENSURE_REQUIREMENTS, [id=#280]
   :                 +- *(1) HashAggregate(keys=[new_store_id#3997 AS new_store_id#3997#4008], functions=[])
   :                    +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
   :                       +- *(1) Filter (((isnotnull(store_id#4002) AND isnotnull(country#4004)) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
   :                          +- *(1) ColumnarToRow
   :                             +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(store_id#4002), isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002..., Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(store_id), IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#305]
      +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
         +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
```

### Why are the changes needed?

Improve DPP to support more cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test and benchmark test:
SQL | Before this PR(Seconds) | After this PR(Seconds)
-- | -- | --
TPC-DS q58 | 40 | 20
TPC-DS q83 | 18 | 14

Closes #33664 from wangyum/SPARK-36444.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
wangyum added a commit that referenced this pull request Aug 19, 2021
…runing

### What changes were proposed in this pull request?

Remove `OptimizeSubqueries` from batch of `PartitionPruning` to make DPP support more cases. For example:
```sql
SELECT date_id, product_id FROM fact_sk f
JOIN (select store_id + 3 as new_store_id from dim_store where country = 'US') s
ON f.store_id = s.new_store_id
```

Before this PR:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#274]
      +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
         +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
```

After this PR:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN dynamicpruning#4007)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   :        +- SubqueryBroadcast dynamicpruning#4007, 0, [new_store_id#3997], [id=#263]
   :           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262]
   :              +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
   :                 +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   +- ReusedExchange [new_store_id#3997], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262]
```
This is because `OptimizeSubqueries` will infer more filters, so we cannot reuse broadcasts. The following is the plan if disable `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly`:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN subquery#4009)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   :        +- Subquery subquery#4009, [id=#284]
   :           +- *(2) HashAggregate(keys=[new_store_id#3997#4008], functions=[])
   :              +- Exchange hashpartitioning(new_store_id#3997#4008, 5), ENSURE_REQUIREMENTS, [id=#280]
   :                 +- *(1) HashAggregate(keys=[new_store_id#3997 AS new_store_id#3997#4008], functions=[])
   :                    +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
   :                       +- *(1) Filter (((isnotnull(store_id#4002) AND isnotnull(country#4004)) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
   :                          +- *(1) ColumnarToRow
   :                             +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(store_id#4002), isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002..., Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(store_id), IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#305]
      +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
         +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
```

### Why are the changes needed?

Improve DPP to support more cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test and benchmark test:
SQL | Before this PR(Seconds) | After this PR(Seconds)
-- | -- | --
TPC-DS q58 | 40 | 20
TPC-DS q83 | 18 | 14

Closes #33664 from wangyum/SPARK-36444.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit 2310b99)
Signed-off-by: Yuming Wang <[email protected]>
turboFei added a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…33] Backport insert operation lock (apache#197)

* [HADP-40184]Backport insert operation lock (#15)

[HADP-31946] Fix data duplicate on application retry and support concurrent write to different partitions in the same table.[HADP-33040][HADP-33041] Optimize merging staging files to output path and detect conflict with HDFS file lease.
HADP-34738] During commitJob, merge paths with multi threads (apache#218)
[HADP-36251] Enhance the concurrent lock mechanism  for insert operation (apache#272)
[HADP-37137] Add option to disable insert operation lock to write partitioned table (apache#286)

* [HADP-46224] Do not overwrite the lock file when creating lock (apache#133)

* [HADP-46868] Fix Spark merge path race condition (apache#161)

* [HADP-50903] Ignore the error message if insert operation lock file has been deleted (apache#271)

* [HADP-50733] Enhance the error message on picking insert operation lock failure (apache#267)

* Fix

* Fix

* Fix

* fix

* Fix

* Fix

* Fix

* Fix

* Fix

* [HADP-50574] Support to create the lock file for EC enabled path (apache#263)

* [HADP-50574][FOLLOWUP] Add parameter type when getting overwrite method (apache#265)

* [HADP-50574][FOLLOWUP] Add UT for creating ec disabled lock file and use underlying DistributedFileSystem for ViewFileSystem (apache#266)

* Fix

* Fix

* Fix

* [HADP-34612][FOLLOWUP] Do not show the insert local error by removing the being written stream from dfs client (apache#288)

* Enabled Hadoop 3

---------

Co-authored-by: fwang12 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants