Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,14 @@ case class AttributeReference(
}
}

def withMetadata(newMetadata: Metadata): AttributeReference = {
if (metadata == newMetadata) {
this
} else {
AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier, isGenerated)
}
}

override protected final def otherCopyArgs: Seq[AnyRef] = {
exprId :: qualifier :: isGenerated :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,14 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")

val dataColumns =
l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver)
l.resolve(files.dataSchema, files.sparkSession.sessionState.analyzer.resolver).map { c =>
files.dataSchema.find(_.name == c.name).map { f =>
c match {
case a: AttributeReference => a.withMetadata(f.metadata)
case _ => c
}
}.getOrElse(c)
}

// Partition keys are not available in the statistics of the files.
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ private[sql] class ParquetFileFormat
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)

// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide link to the doc saying it is row group level?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(it is not obvious to know this is just for row group level)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does parquet support row group level predicate evaluation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups in SpecificParquetRecordReaderBase to do filtering.

The implementation of RowGroupFilter is at https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java.

From this, Looks like it does filtering.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, as we use the metadata in merged schema to figure out if a field is optional (i.e. not in all parquet files) or not to decide to push down a filter regarding it, this info has been ignored in FileSourceStrategy now. Without the fixing in this change, the push-down row-group level filtering will be failed due to not existing field in parquet file.

pushed.foreach {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _)
}
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
vectorizedReader.initialize(split, hadoopAttemptContext)
Expand Down Expand Up @@ -585,62 +590,6 @@ private[sql] object ParquetFileFormat extends Logging {
}
}

/** This closure sets various Parquet configurations at both driver side and executor side. */
private[parquet] def initializeLocalJobFunc(
requiredColumns: Array[String],
filters: Array[Filter],
dataSchema: StructType,
parquetBlockSize: Long,
useMetadataCache: Boolean,
parquetFilterPushDown: Boolean,
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
val conf = job.getConfiguration
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)

// Try to push down filters when filter push-down is enabled.
if (parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(dataSchema, _))
.reduceOption(FilterApi.and)
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
}

conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
CatalystSchemaConverter.checkFieldNames(requestedSchema).json
})

conf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
CatalystSchemaConverter.checkFieldNames(dataSchema).json)

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)

// Sets flags for `CatalystSchemaConverter`
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)

overrideMinSplitSize(parquetBlockSize, conf)
}

/** This closure sets input paths at the driver side. */
private[parquet] def initializeDriverSideJobFunc(
inputFiles: Array[FileStatus],
parquetBlockSize: Long)(job: Job): Unit = {
// We side the input paths at the driver side.
logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
if (inputFiles.nonEmpty) {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}

overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
}

private[parquet] def readSchema(
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {

Expand Down