Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)

// Predicates with both partition keys and attributes
val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
val partitionAndNormalColumnFilters =
filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet

val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray

Expand All @@ -88,16 +89,33 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
}

// need to add projections from "partitionAndNormalColumnAttrs" in if it is not empty
val partitionAndNormalColumnAttrs = AttributeSet(partitionAndNormalColumnFilters)
val partitionAndNormalColumnProjs = if (partitionAndNormalColumnAttrs.isEmpty) {
projects
} else {
(partitionAndNormalColumnAttrs ++ projects).toSeq
}

val scan = buildPartitionedTableScan(
l,
projects,
partitionAndNormalColumnProjs,
pushedFilters,
t.partitionSpec.partitionColumns,
selectedPartitions)

combineFilters
.reduceLeftOption(expressions.And)
.map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
// Add a Projection to guarantee the original projection:
// this is because "partitionAndNormalColumnAttrs" may be different
// from the original "projects", in elements or their ordering

partitionAndNormalColumnFilters.reduceLeftOption(expressions.And).map(cf =>
if (projects.isEmpty || projects == partitionAndNormalColumnProjs) {
// if the original projection is empty, no need for the additional Project either
execution.Filter(cf, scan)
} else {
execution.Project(projects, execution.Filter(cf, scan))
}
).getOrElse(scan) :: Nil

// Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,47 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("SPARK-12231: test the filter and empty project in partitioned DataSource scan") {
import testImplicits._

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}"
(1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d").
write.partitionBy("a").parquet(path)

// The filter "a > 1 or b < 2" will not get pushed down, and the projection is empty,
// this query will throw an exception since the project from combinedFilter expect
// two projection while the
val df1 = sqlContext.read.parquet(dir.getCanonicalPath)

assert(df1.filter("a > 1 or b < 2").count() == 2)
}
}
}

test("SPARK-12231: test the new projection in partitioned DataSource scan") {
import testImplicits._

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}"
(1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d").
write.partitionBy("a").parquet(path)

// test the generate new projection case
// when projects != partitionAndNormalColumnProjs

val df1 = sqlContext.read.parquet(dir.getCanonicalPath)

checkAnswer(
df1.filter("a > 1 or b > 2").orderBy("a").selectExpr("a", "b", "c", "d"),
(2 to 3).map(i => Row(i, i + 1, i + 2, i + 3)))
}
}
}


test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
import testImplicits._

Expand Down