From d0d866b02c4314e3d18180020a4fc0057d140824 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 25 Oct 2015 23:06:03 +0800 Subject: [PATCH] fix case sensitivity for filter on partitioned columns --- .../execution/datasources/DataSourceStrategy.scala | 12 +++++------- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 10 ++++++++++ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index ffb4645b8932..af6626c89758 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -63,16 +63,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) if t.partitionSpec.partitionColumns.nonEmpty => // We divide the filter expressions into 3 parts - val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet + val partitionColumns = AttributeSet( + t.partitionColumns.map(c => l.output.find(_.name == c.name).get)) - // TODO this is case-sensitive - // Only prunning the partition keys - val partitionFilters = - filters.filter(_.references.map(_.name).toSet.subsetOf(partitionColumnNames)) + // Only pruning the partition keys + val partitionFilters = filters.filter(_.references.subsetOf(partitionColumns)) // Only pushes down predicates that do not reference partition keys. - val pushedFilters = - filters.filter(_.references.map(_.name).toSet.intersect(partitionColumnNames).isEmpty) + val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty) // Predicates with both partition keys and attributes val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f4c7aa34e560..5d4f1a75fd07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -980,4 +980,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src") assert(df.select($"src.i".cast(StringType)).columns.head === "i") } + + test("SPARK-11301: fix case sensitivity for filter on partitioned columns") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + withTempPath { path => + Seq(2012 -> "a").toDF("year", "val").write.partitionBy("year").parquet(path.getAbsolutePath) + val df = sqlContext.read.parquet(path.getAbsolutePath) + checkAnswer(df.filter($"yEAr" > 2000).select($"val"), Row("a")) + } + } + } }