Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -345,24 +345,34 @@ private[sql] class ParquetRelation2(
// Schema of the whole table, including partition columns.
var schema: StructType = _

// Cached leaves
var cachedLeaves: Set[FileStatus] = null

/**
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
*/
def refresh(): Unit = {
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = cachedLeafStatuses().filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}.toArray

dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)

// If we already get the schema, don't need to re-compute it since the schema merging is
// time-consuming.
if (dataSchema == null) {
val currentLeafStatuses = cachedLeafStatuses()

// Check if cachedLeafStatuses is changed or not
val leafStatusesChanged = (cachedLeaves == null) ||
!cachedLeaves.equals(currentLeafStatuses)

if (leafStatusesChanged) {
cachedLeaves = currentLeafStatuses.toIterator.toSet

// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = currentLeafStatuses.filter { f =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is irrelevant to this PR, but I'd like to point out that this check is unnecessary now. IIRC, at the time PR #6012 was merged, ParquetRelation2 still needed to be serialized to executor side, thus avoiding schema merging on executor side makes sense. However, after migrating to HadoopFsRelation, ParquetRelation2 won't be serialized anymore. You may see that ParquetRelation2 is no longer a case class and doesn't even extend Serializable now. I made this change intentionally to make sure ParquetRelation2 is never serialized.

isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}.toArray

dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)

dataSchema = {
val dataSchema0 = maybeDataSchema
.orElse(readSchema())
Expand Down