Skip to content

Conversation

@allisonwang-db
Copy link
Contributor

What changes were proposed in this pull request?

This PR updates ProjectionOverSchema to use the outputs of the data source relation to filter the attributes in the nested schema pruning. This is needed because the attributes in the schema do not necessarily belong to the current data source relation. For example, if a filter contains a correlated subquery, then the subquery's children can contain attributes from both the inner query and the outer query. Since the RewriteSubquery batch happens after early scan pushdown rules, nested schema pruning can wrongly use the inner query's attributes to prune the outer query data schema, thus causing wrong results and unexpected exceptions.

Why are the changes needed?

To fix a bug in SchemaPruning.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test

@github-actions github-actions bot added the SQL label Apr 16, 2022
@HyukjinKwon
Copy link
Member

cc @viirya fyi

* attributes in the schema that do not belong to the current relation.
*/
case class ProjectionOverSchema(schema: StructType) {
case class ProjectionOverSchema(schema: StructType, output: Option[AttributeSet] = None) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't always need it? It is a None by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like AttributeSet is required for correctness. If we make it required, can we drop fieldNames var below and just check the attribute set?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

|where not exists (select null from employees e where e.name.first = c.name.first
| and e.employer.name = c.employer.company.name)
|""".stripMargin)
checkAnswer(query, Row(3))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check pruning schema too?

}
}

testSchemaPruning("SPARK-38918: nested schema pruning with correlated subqueries") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this PR, this test failed with java.lang.RuntimeException: Once strategy's idempotence is broken for batch RewriteSubquery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it looks like a separate issue with column pruning and subquery rewrite (data source v1 only). I will investigate more.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix looks good. With a few comments.

override protected val excludedOnceBatches: Set[String] =
Set(
"PartitionPruning",
"RewriteSubquery",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered that this Once batch is not idempotent. ColumnPruning and CollapseProject can be applied multiple times after correlated IN/EXISTS subqueries are rewritten. Happy to discuss other ways to fix/improve this batch. cc @cloud-fan

Attached the plan change log for the test case:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery ===
 Aggregate [count(1) AS count(1)#164L]                                                                                                            Aggregate [count(1) AS count(1)#164L]
 +- Project                                                                                                                                       +- Project
!   +- Filter NOT exists#157 [name#117 && employer#122 && (name#152.first = name#117.first) && (employer#153.name = employer#122.company.name)]      +- Join LeftAnti, ((name#152.first = name#117.first) AND (employer#153.name = employer#122.company.name))
!      :  +- Project [null AS NULL#163, name#152, employer#153]                                                                                         :- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
!      :     +- Relation [id#151,name#152,employer#153] parquet                                                                                         +- Project [null AS NULL#163, name#152, employer#153]
!      +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet                               +- Relation [id#151,name#152,employer#153] parquet
           
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
 Aggregate [count(1) AS count(1)#164L]                                                                                         Aggregate [count(1) AS count(1)#164L]
 +- Project                                                                                                                    +- Project
!   +- Join LeftAnti, ((name#152.first = name#117.first) AND (employer#153.name = employer#122.company.name))                     +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))
!      :- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet         :- Project [id#116, name#117.first AS _extract_first#167, address#118, pets#119, friends#120, relatives#121, employer#122.company.name AS _extract_name#169, relations#123, p#124]
!      +- Project [null AS NULL#163, name#152, employer#153]                                                                         :  +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
!         +- Relation [id#151,name#152,employer#153] parquet                                                                         +- Project [_extract_first#166, _extract_name#168]
!                                                                                                                                       +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]
!                                                                                                                                          +- Project [name#152, employer#153]
!                                                                                                                                             +- Relation [id#151,name#152,employer#153] parquet
           
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
 Aggregate [count(1) AS count(1)#164L]                                                                                                                                                      Aggregate [count(1) AS count(1)#164L]
 +- Project                                                                                                                                                                                 +- Project
    +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))                                                                                  +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))
       :- Project [id#116, name#117.first AS _extract_first#167, address#118, pets#119, friends#120, relatives#121, employer#122.company.name AS _extract_name#169, relations#123, p#124]         :- Project [id#116, name#117.first AS _extract_first#167, address#118, pets#119, friends#120, relatives#121, employer#122.company.name AS _extract_name#169, relations#123, p#124]
       :  +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet                                                                   :  +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
!      +- Project [_extract_first#166, _extract_name#168]                                                                                                                                         +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]
!         +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]                                                                                                  +- Relation [id#151,name#152,employer#153] parquet
!            +- Project [name#152, employer#153]                                                                                                                                            
!               +- Relation [id#151,name#152,employer#153] parquet                                                                                                                          
           
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
 Aggregate [count(1) AS count(1)#164L]                                                                                                                                                      Aggregate [count(1) AS count(1)#164L]
 +- Project                                                                                                                                                                                 +- Project
    +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))                                                                                  +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))
!      :- Project [id#116, name#117.first AS _extract_first#167, address#118, pets#119, friends#120, relatives#121, employer#122.company.name AS _extract_name#169, relations#123, p#124]         :- Project [_extract_first#167, _extract_name#169]
!      :  +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet                                                                   :  +- Project [name#117.first AS _extract_first#167, employer#122.company.name AS _extract_name#169]
!      +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]                                                                                                  :     +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
!         +- Relation [id#151,name#152,employer#153] parquet                                                                                                                                      +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]
!                                                                                                                                                                                                    +- Relation [id#151,name#152,employer#153] parquet           

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
 Aggregate [count(1) AS count(1)#164L]                                                                                               Aggregate [count(1) AS count(1)#164L]
 +- Project                                                                                                                          +- Project
    +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))                           +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))
!      :- Project [_extract_first#167, _extract_name#169]                                                                                  :- Project [name#117.first AS _extract_first#167, employer#122.company.name AS _extract_name#169]
!      :  +- Project [name#117.first AS _extract_first#167, employer#122.company.name AS _extract_name#169]                                :  +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
!      :     +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet         +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]
!      +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]                                              +- Relation [id#151,name#152,employer#153] parquet
!         +- Relation [id#151,name#152,employer#153] parquet                                                                         
           
=== Result of Batch RewriteSubquery ===
 Aggregate [count(1) AS count(1)#164L]                                                                                                            Aggregate [count(1) AS count(1)#164L]
 +- Project                                                                                                                                       +- Project
!   +- Filter NOT exists#157 [name#117 && employer#122 && (name#152.first = name#117.first) && (employer#153.name = employer#122.company.name)]      +- Join LeftAnti, ((_extract_first#166 = _extract_first#167) AND (_extract_name#168 = _extract_name#169))
!      :  +- Project [null AS NULL#163, name#152, employer#153]                                                                                         :- Project [name#117.first AS _extract_first#167, employer#122.company.name AS _extract_name#169]
!      :     +- Relation [id#151,name#152,employer#153] parquet                                                                                         :  +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet
!      +- Relation [id#116,name#117,address#118,pets#119,friends#120,relatives#121,employer#122,relations#123,p#124] parquet                            +- Project [name#152.first AS _extract_first#166, employer#153.name AS _extract_name#168]
!        

Copy link
Member

@viirya viirya Apr 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't find that before because we don't have the test coverage?

Copy link
Member

@viirya viirya Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

viirya
viirya previously approved these changes Apr 25, 2022
@viirya
Copy link
Member

viirya commented Apr 25, 2022

Hmm, there seems related test failure:

[info] - case insensitivity with scala reflection *** FAILED *** (89 milliseconds)
[info]   java.lang.IllegalArgumentException: b does not exist. Available: a, B, n
[info]   at org.apache.spark.sql.types.StructType.$anonfun$apply$1(StructType.scala:282)
[info]   at scala.collection.immutable.Map$Map3.getOrElse(Map.scala:336)
[info]   at org.apache.spark.sql.types.StructType.apply(StructType.scala:281)
[info]   at org.apache.spark.sql.catalyst.expressions.ProjectionOverSchema.getProjection(ProjectionOverSchema.scala:39)

@viirya viirya dismissed their stale review April 25, 2022 23:23

there are test failures

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Just want to make sure the excludedOnceBatches change is not caused by this.

@allisonwang-db
Copy link
Contributor Author

Just want to make sure the excludedOnceBatches change is not caused by this.

@viirya That's correct. It is not caused by this PR. The new test case happens to expose the idempotency issue that was not discovered before.

@viirya
Copy link
Member

viirya commented Apr 27, 2022

@allisonwang-db I saw you only list 3.3.0 in "Affects Versions". But I guess this should also be an issue in 3.2?

@allisonwang-db
Copy link
Contributor Author

@viirya Yes! This fix also needs to be in 3.0/3.1/3.2.

@viirya
Copy link
Member

viirya commented Apr 27, 2022

Thanks. Merging to master/3.3.

@viirya viirya closed this in 150434b Apr 27, 2022
viirya pushed a commit that referenced this pull request Apr 27, 2022
… that do not belong to the current relation

### What changes were proposed in this pull request?
This PR updates `ProjectionOverSchema`  to use the outputs of the data source relation to filter the attributes in the nested schema pruning. This is needed because the attributes in the schema do not necessarily belong to the current data source relation. For example, if a filter contains a correlated subquery, then the subquery's children can contain attributes from both the inner query and the outer query. Since the `RewriteSubquery` batch happens after early scan pushdown rules, nested schema pruning can wrongly use the inner query's attributes to prune the outer query data schema, thus causing wrong results and unexpected exceptions.

### Why are the changes needed?

To fix a bug in `SchemaPruning`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #36216 from allisonwang-db/spark-38918-nested-column-pruning.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit 150434b)
Signed-off-by: Liang-Chi Hsieh <[email protected]>
@viirya
Copy link
Member

viirya commented Apr 27, 2022

@allisonwang-db There are conflicts in 3.2/3.1/3.0. Can you create separate PR(s) for them?

allisonwang-db added a commit to allisonwang-db/spark that referenced this pull request Apr 27, 2022
… that do not belong to the current relation

This PR updates `ProjectionOverSchema`  to use the outputs of the data source relation to filter the attributes in the nested schema pruning. This is needed because the attributes in the schema do not necessarily belong to the current data source relation. For example, if a filter contains a correlated subquery, then the subquery's children can contain attributes from both the inner query and the outer query. Since the `RewriteSubquery` batch happens after early scan pushdown rules, nested schema pruning can wrongly use the inner query's attributes to prune the outer query data schema, thus causing wrong results and unexpected exceptions.

To fix a bug in `SchemaPruning`.

No

Unit test

Closes apache#36216 from allisonwang-db/spark-38918-nested-column-pruning.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit 150434b)
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit 793ba60)
Signed-off-by: allisonwang-db <[email protected]>
allisonwang-db added a commit to allisonwang-db/spark that referenced this pull request Apr 27, 2022
… that do not belong to the current relation

This PR updates `ProjectionOverSchema`  to use the outputs of the data source relation to filter the attributes in the nested schema pruning. This is needed because the attributes in the schema do not necessarily belong to the current data source relation. For example, if a filter contains a correlated subquery, then the subquery's children can contain attributes from both the inner query and the outer query. Since the `RewriteSubquery` batch happens after early scan pushdown rules, nested schema pruning can wrongly use the inner query's attributes to prune the outer query data schema, thus causing wrong results and unexpected exceptions.

To fix a bug in `SchemaPruning`.

No

Unit test

Closes apache#36216 from allisonwang-db/spark-38918-nested-column-pruning.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit 150434b)
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit 793ba60)
Signed-off-by: allisonwang-db <[email protected]>
viirya pushed a commit that referenced this pull request Apr 28, 2022
…butes that do not belong to the current relation

### What changes were proposed in this pull request?

Backport #36216 to branch-3.1.

### Why are the changes needed?

To fix a bug in `SchemaPruning`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #36387 from allisonwang-db/spark-38918-branch-3.1.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
viirya pushed a commit that referenced this pull request May 1, 2022
…butes that do not belong to the current relation

### What changes were proposed in this pull request?

Backport #36216 to branch-3.0.

### Why are the changes needed?

To fix a bug in `SchemaPruning`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #36388 from allisonwang-db/spark-38918-branch-3.0.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
allisonwang-db added a commit to allisonwang-db/spark that referenced this pull request Jun 10, 2022
… that do not belong to the current relation

This PR updates `ProjectionOverSchema`  to use the outputs of the data source relation to filter the attributes in the nested schema pruning. This is needed because the attributes in the schema do not necessarily belong to the current data source relation. For example, if a filter contains a correlated subquery, then the subquery's children can contain attributes from both the inner query and the outer query. Since the `RewriteSubquery` batch happens after early scan pushdown rules, nested schema pruning can wrongly use the inner query's attributes to prune the outer query data schema, thus causing wrong results and unexpected exceptions.

To fix a bug in `SchemaPruning`.

No

Unit test

Closes apache#36216 from allisonwang-db/spark-38918-nested-column-pruning.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit 150434b)
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit 793ba60)
Signed-off-by: allisonwang-db <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Jun 10, 2022
…butes that do not belong to the current relation

### What changes were proposed in this pull request?

Backport #36216 to branch-3.2

### Why are the changes needed?

To fix a bug in `SchemaPruning`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #36386 from allisonwang-db/spark-38918-branch-3.2.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…butes that do not belong to the current relation

### What changes were proposed in this pull request?

Backport apache#36216 to branch-3.2

### Why are the changes needed?

To fix a bug in `SchemaPruning`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes apache#36386 from allisonwang-db/spark-38918-branch-3.2.

Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants