Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ case class ParquetScanBuilder(
// are combined with filter or group by
// e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
// SELECT COUNT(col1) FROM t GROUP BY col2
// Todo: 1. add support if groupby column is partition col
// (https://issues.apache.org/jira/browse/SPARK-36646)
// 2. add support if filter col is partition col
// (https://issues.apache.org/jira/browse/SPARK-36647)
// However, if the filter is on partition column, max/min/count can still be pushed down
// Todo: add support if groupby column is partition col
// (https://issues.apache.org/jira/browse/SPARK-36646)
return false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,9 @@ abstract class ParquetAggregatePushDownSuite
.write.partitionBy("p").parquet(dir.getCanonicalPath)
withTempView("tmp") {
spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp");
val enableVectorizedReader = Seq("false", "true")
for (testVectorizedReader <- enableVectorizedReader) {
Seq("false", "true").foreach { enableVectorizedReader =>
withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
vectorizedReaderEnabledKey -> testVectorizedReader) {
vectorizedReaderEnabledKey -> enableVectorizedReader) {
val count = sql("SELECT COUNT(p) FROM tmp")
count.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
Expand Down Expand Up @@ -221,7 +220,7 @@ abstract class ParquetAggregatePushDownSuite
}
}

test("aggregate push down - query with filter not push down") {
test("aggregate push down - aggregate with data filter cannot be pushed down") {
val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19),
(9, "mno", 7), (2, null, 7))
withParquetTable(data, "t") {
Expand All @@ -240,6 +239,29 @@ abstract class ParquetAggregatePushDownSuite
}
}

test("aggregate push down - aggregate with partition filter can be pushed down") {
withTempPath { dir =>
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").parquet(dir.getCanonicalPath)
withTempView("tmp") {
spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp");
Seq("false", "true").foreach { enableVectorizedReader =>
withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
vectorizedReaderEnabledKey -> enableVectorizedReader) {
val max = sql("SELECT max(id), min(id), count(id) FROM tmp WHERE p = 0")
max.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregation: [MAX(id), MIN(id), COUNT(id)]"
checkKeywordsExistsInExplain(max, expected_plan_fragment)
}
checkAnswer(max, Seq(Row(9, 0, 4)))
}
}
}
}
}

test("aggregate push down - push down only if all the aggregates can be pushed down") {
val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19),
(9, "mno", 7), (2, null, 7))
Expand Down Expand Up @@ -356,10 +378,9 @@ abstract class ParquetAggregatePushDownSuite
spark.createDataFrame(rdd, schema).write.parquet(file.getCanonicalPath)
withTempView("test") {
spark.read.parquet(file.getCanonicalPath).createOrReplaceTempView("test")
val enableVectorizedReader = Seq("false", "true")
for (testVectorizedReader <- enableVectorizedReader) {
Seq("false", "true").foreach { enableVectorizedReader =>
withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
vectorizedReaderEnabledKey -> testVectorizedReader) {
vectorizedReaderEnabledKey -> enableVectorizedReader) {

val testMinWithTS = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " +
"min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " +
Expand Down Expand Up @@ -477,10 +498,9 @@ abstract class ParquetAggregatePushDownSuite
}

test("aggregate push down - column name case sensitivity") {
val enableVectorizedReader = Seq("false", "true")
for (testVectorizedReader <- enableVectorizedReader) {
Seq("false", "true").foreach { enableVectorizedReader =>
withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
vectorizedReaderEnabledKey -> testVectorizedReader) {
vectorizedReaderEnabledKey -> enableVectorizedReader) {
withTempPath { dir =>
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").parquet(dir.getCanonicalPath)
Expand Down