Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class ParquetScanBuilder(
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetSchema =
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the partition column is in file? I guess this might be why we push down partition filter.

Copy link
Contributor Author

@LuciferYang LuciferYang Dec 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question, but it seems that the filter pushed down in DataSource V1 does not contain the filter related to partition columns too.

The dataFilters use to construct FileSourceScanExec and pass to ParquetFileFormat.buildReaderWithPartitionValues to build pushed filters also filtered out partition filters, am I right?

// Partition keys are not available in the statistics of the files.
// `dataColumns` might have partition columns, we need to filter them out.
val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionColumns.contains)
val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f =>
if (f.references.intersect(partitionSet).nonEmpty) {
extractPredicatesWithinOutputSet(f, AttributeSet(dataColumnsWithoutPartitionCols))
} else {
Some(f)
}
}

Copy link
Contributor Author

@LuciferYang LuciferYang Dec 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe another way is refer to parquet FileMetaData#getSchema to determine which filter should be pushed down

Copy link
Member

@wangyum wangyum Dec 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is a correct fix. I have a pr to fix partition column is in file.
#30670

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should use readDataSchema() instead of dataSchema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address b9f8eb2 use readDataSchema() instead of dataSchema

new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(readDataSchema())
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
parquetFilters.convertibleFilters(this.filters).toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
val basePath = dir.getCanonicalPath + "/" + fmt
val pushFilterMaps = Map (
"parquet" ->
"|PushedFilers: \\[.*\\(id\\), .*\\(value\\), .*\\(id,1\\), .*\\(value,2\\)\\]",
"|PushedFilers: \\[IsNotNull\\(value\\), GreaterThan\\(value,2\\)\\]",
"orc" ->
"|PushedFilers: \\[.*\\(id\\), .*\\(value\\), .*\\(id,1\\), .*\\(value,2\\)\\]",
"csv" ->
Expand Down