Skip to content

Commit 6b10ea5

Browse files
cloud-fanyhuai
authored andcommitted
[SPARK-10829] [SPARK-11301] [SQL] fix 2 bugs for filter on partitioned columns (1.5 backport)
[SPARK-10829](#8916) Filter combine partition key and attribute doesn't work in DataSource scan [SPARK-11301](#9271) fix case sensitivity for filter on partitioned columns Author: Wenchen Fan <[email protected]> This patch had conflicts when merged, resolved by Committer: Yin Huai <[email protected]> Closes #9371 from cloud-fan/branch-1.5.
1 parent 06d3257 commit 6b10ea5

File tree

2 files changed

+30
-12
lines changed

2 files changed

+30
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,20 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
6262
// Scanning partitioned HadoopFsRelation
6363
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
6464
if t.partitionSpec.partitionColumns.nonEmpty =>
65-
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
65+
// We divide the filter expressions into 3 parts
66+
val partitionColumns = AttributeSet(
67+
t.partitionColumns.map(c => l.output.find(_.name == c.name).get))
68+
69+
// Only pruning the partition keys
70+
val partitionFilters = filters.filter(_.references.subsetOf(partitionColumns))
71+
72+
// Only pushes down predicates that do not reference partition keys.
73+
val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
74+
75+
// Predicates with both partition keys and attributes
76+
val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
77+
78+
val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray
6679

6780
logInfo {
6881
val total = t.partitionSpec.partitions.length
@@ -71,21 +84,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
7184
s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
7285
}
7386

74-
// Only pushes down predicates that do not reference partition columns.
75-
val pushedFilters = {
76-
val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
77-
filters.filter { f =>
78-
val referencedColumnNames = f.references.map(_.name).toSet
79-
referencedColumnNames.intersect(partitionColumnNames).isEmpty
80-
}
81-
}
82-
83-
buildPartitionedTableScan(
87+
val scan = buildPartitionedTableScan(
8488
l,
8589
projects,
8690
pushedFilters,
8791
t.partitionSpec.partitionColumns,
88-
selectedPartitions) :: Nil
92+
selectedPartitions)
93+
94+
combineFilters
95+
.reduceLeftOption(expressions.And)
96+
.map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
8997

9098
// Scanning non-partitioned HadoopFsRelation
9199
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -937,4 +937,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
937937
expected(except)
938938
)
939939
}
940+
941+
test("SPARK-11301: fix case sensitivity for filter on partitioned columns") {
942+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
943+
withTempPath { path =>
944+
Seq(2012 -> "a").toDF("year", "val").write.partitionBy("year").parquet(path.getAbsolutePath)
945+
val df = sqlContext.read.parquet(path.getAbsolutePath)
946+
checkAnswer(df.filter($"yEAr" > 2000).select($"val"), Row("a"))
947+
}
948+
}
949+
}
940950
}

0 commit comments

Comments
 (0)