Skip to content

Commit 2a996f1

Browse files
allisonwang-dbviirya
authored andcommitted
[SPARK-38918][SQL][3.1] Nested column pruning should filter out attributes that do not belong to the current relation
### What changes were proposed in this pull request? Backport #36216 to branch-3.1. ### Why are the changes needed? To fix a bug in `SchemaPruning`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #36387 from allisonwang-db/spark-38918-branch-3.1. Authored-by: allisonwang-db <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent 30be8d0 commit 2a996f1

File tree

6 files changed

+55
-7
lines changed

6 files changed

+55
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,19 @@ import org.apache.spark.sql.types._
2424
* field indexes and field counts of complex type extractors and attributes
2525
* are adjusted to fit the schema. All other expressions are left as-is. This
2626
* class is motivated by columnar nested schema pruning.
27+
*
28+
* @param schema nested column schema
29+
* @param output output attributes of the data source relation. They are used to filter out
30+
* attributes in the schema that do not belong to the current relation.
2731
*/
28-
case class ProjectionOverSchema(schema: StructType) {
32+
case class ProjectionOverSchema(schema: StructType, output: AttributeSet) {
2933
private val fieldNames = schema.fieldNames.toSet
3034

3135
def unapply(expr: Expression): Option[Expression] = getProjection(expr)
3236

3337
private def getProjection(expr: Expression): Option[Expression] =
3438
expr match {
35-
case a: AttributeReference if fieldNames.contains(a.name) =>
39+
case a: AttributeReference if fieldNames.contains(a.name) && output.contains(a) =>
3640
Some(a.copy(dataType = schema(a.name).dataType)(a.exprId, a.qualifier))
3741
case GetArrayItem(child, arrayItemOrdinal, failOnError) =>
3842
getProjection(child).map {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
5858
override protected val excludedOnceBatches: Set[String] =
5959
Set(
6060
"PartitionPruning",
61+
"RewriteSubquery",
6162
"Extract Python UDFs")
6263

6364
protected def fixedPoint =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ object ObjectSerializerPruning extends Rule[LogicalPlan] {
225225
}
226226

227227
// Builds new projection.
228-
val projectionOverSchema = ProjectionOverSchema(prunedSchema)
228+
val projectionOverSchema = ProjectionOverSchema(prunedSchema, AttributeSet(s.output))
229229
val newProjects = p.projectList.map(_.transformDown {
230230
case projectionOverSchema(expr) => expr
231231
}).map { case expr: NamedExpression => expr }

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ object SchemaPruning extends Rule[LogicalPlan] {
8282
// in dataSchema.
8383
if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
8484
val prunedRelation = leafNodeBuilder(prunedDataSchema)
85-
val projectionOverSchema = ProjectionOverSchema(prunedDataSchema)
85+
val projectionOverSchema = ProjectionOverSchema(prunedDataSchema, AttributeSet(output))
8686

8787
Some(buildNewProjection(projects, normalizedProjects, normalizedFilters,
8888
prunedRelation, projectionOverSchema))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20-
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression}
20+
import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, NamedExpression, ProjectionOverSchema, SubqueryExpression}
2121
import org.apache.spark.sql.catalyst.planning.ScanOperation
2222
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
2323
import org.apache.spark.sql.catalyst.rules.Rule
@@ -67,7 +67,8 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
6767

6868
val scanRelation = DataSourceV2ScanRelation(relation, wrappedScan, output)
6969

70-
val projectionOverSchema = ProjectionOverSchema(output.toStructType)
70+
val projectionOverSchema =
71+
ProjectionOverSchema(output.toStructType, AttributeSet(output))
7172
val projectionFunc = (expr: Expression) => expr transformDown {
7273
case projectionOverSchema(newExpr) => newExpr
7374
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,15 @@ abstract class SchemaPruningSuite
5757
contactId: Int,
5858
employer: Employer)
5959

60+
case class Employee(id: Int, name: FullName, employer: Company)
61+
6062
val janeDoe = FullName("Jane", "X.", "Doe")
6163
val johnDoe = FullName("John", "Y.", "Doe")
6264
val susanSmith = FullName("Susan", "Z.", "Smith")
6365

64-
val employer = Employer(0, Company("abc", "123 Business Street"))
66+
val company = Company("abc", "123 Business Street")
67+
68+
val employer = Employer(0, company)
6569
val employerWithNullCompany = Employer(1, null)
6670
val employerWithNullCompany2 = Employer(2, null)
6771

@@ -77,6 +81,8 @@ abstract class SchemaPruningSuite
7781
Department(1, "Marketing", 1, employerWithNullCompany) ::
7882
Department(2, "Operation", 4, employerWithNullCompany2) :: Nil
7983

84+
val employees = Employee(0, janeDoe, company) :: Employee(1, johnDoe, company) :: Nil
85+
8086
case class Name(first: String, last: String)
8187
case class BriefContact(id: Int, name: Name, address: String)
8288

@@ -580,6 +586,25 @@ abstract class SchemaPruningSuite
580586
}
581587
}
582588

589+
testSchemaPruning("SPARK-38918: nested schema pruning with correlated subqueries") {
590+
withContacts {
591+
withEmployees {
592+
val query = sql(
593+
"""
594+
|select count(*)
595+
|from contacts c
596+
|where not exists (select null from employees e where e.name.first = c.name.first
597+
| and e.employer.name = c.employer.company.name)
598+
|""".stripMargin)
599+
checkScan(query,
600+
"struct<name:struct<first:string>,employer:struct<company:struct<name:string>>>",
601+
"struct<name:struct<first:string,middle:string,last:string>," +
602+
"employer:struct<name:string,address:string>>")
603+
checkAnswer(query, Row(3))
604+
}
605+
}
606+
}
607+
583608
protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit = {
584609
test(s"Spark vectorized reader - without partition data column - $testName") {
585610
withSQLConf(vectorizedReaderEnabledKey -> "true") {
@@ -660,6 +685,23 @@ abstract class SchemaPruningSuite
660685
}
661686
}
662687

688+
private def withEmployees(testThunk: => Unit): Unit = {
689+
withTempPath { dir =>
690+
val path = dir.getCanonicalPath
691+
692+
makeDataSourceFile(employees, new File(path + "/employees"))
693+
694+
// Providing user specified schema. Inferred schema from different data sources might
695+
// be different.
696+
val schema = "`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
697+
"`employer` STRUCT<`name`: STRING, `address`: STRING>"
698+
spark.read.format(dataSourceName).schema(schema).load(path + "/employees")
699+
.createOrReplaceTempView("employees")
700+
701+
testThunk
702+
}
703+
}
704+
663705
case class MixedCaseColumn(a: String, B: Int)
664706
case class MixedCase(id: Int, CoL1: String, coL2: MixedCaseColumn)
665707

0 commit comments

Comments
 (0)