Skip to content

Commit 802cbd7

Browse files
committed
Fixes bugs related to schema merging and empty requested columns
1 parent 38fe1e7 commit 802cbd7

File tree

1 file changed

+87
-25
lines changed

1 file changed

+87
-25
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 87 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -63,46 +63,108 @@ private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logg
6363
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
6464

6565
val toCatalyst = new CatalystSchemaConverter(conf)
66-
val parquetSchema = readContext.getRequestedSchema
67-
val catalystSchema =
68-
Option(readContext.getReadSupportMetadata)
69-
.map(_.toMap)
70-
.flatMap { metadata =>
71-
metadata
72-
// First tries to read requested schema, which may result from projections
73-
.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
74-
// If not available, tries to read Catalyst schema from file metadata. It's only
75-
// available if the target file is written by Spark SQL.
76-
.orElse(metadata.get(RowReadSupport.SPARK_METADATA_KEY))
77-
}
78-
.map(StructType.fromString)
79-
.getOrElse {
80-
logDebug("Catalyst schema not available, falling back to Parquet message type")
81-
toCatalyst.convert(parquetSchema)
82-
}
66+
val parquetRequestedSchema = readContext.getRequestedSchema
67+
68+
val catalystRequestedSchema =
69+
Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata =>
70+
metadata
71+
// First tries to read requested schema, which may result from projections
72+
.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
73+
// If not available, tries to read Catalyst schema from file metadata. It's only
74+
// available if the target file is written by Spark SQL.
75+
.orElse(metadata.get(RowReadSupport.SPARK_METADATA_KEY))
76+
}.map(StructType.fromString).getOrElse {
77+
logDebug("Catalyst schema not available, falling back to Parquet schema")
78+
toCatalyst.convert(parquetRequestedSchema)
79+
}
8380

84-
logDebug(s"Catalyst schema used to read Parquet files: $catalystSchema")
85-
new RowRecordMaterializer(parquetSchema, catalystSchema)
81+
logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
82+
new RowRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
8683
}
8784

8885
override def init(context: InitContext): ReadContext = {
8986
val conf = context.getConfiguration
87+
88+
// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
89+
// schema of this file from its the metadata.
9090
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
91+
92+
// Optional schema of requested columns, in the form of a string serialized from a Catalyst
93+
// `StructType` containing all requested columns.
9194
val maybeRequestedSchema = Option(conf.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
9295

96+
// Below we construct a Parquet schema containing all requested columns. This schema tells
97+
// Parquet which columns to read.
98+
//
99+
// If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
100+
// we have to fallback to the full file schema which contains all columns in the file.
101+
// Obviously this may waste IO bandwidth since it may read more columns than requested.
102+
//
103+
// Two things to note:
104+
//
105+
// 1. It's possible that some requested columns don't exist in the target Parquet file. For
106+
// example, in the case of schema merging, the globally merged schema may contain extra
107+
// columns gathered from other Parquet files. These columns will be simply filled with nulls
108+
// when actually reading the target Parquet file.
109+
//
110+
// 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
111+
// Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
112+
// non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
113+
// containing a single integer array field `f1` may have the following legacy 2-level
114+
// structure:
115+
//
116+
// message root {
117+
// optional group f1 (LIST) {
118+
// required INT32 element;
119+
// }
120+
// }
121+
//
122+
// while `CatalystSchemaConverter` may generate a standard 3-level structure:
123+
//
124+
// message root {
125+
// optional group f1 (LIST) {
126+
// repeated group list {
127+
// required INT32 element;
128+
// }
129+
// }
130+
// }
131+
//
132+
// Apparently, we can't use the 2nd schema to read the target Parquet file as they have
133+
// different physical structures.
93134
val parquetRequestedSchema =
94-
maybeRequestedSchema.map { schemaString =>
95-
StructType.fromString(schemaString).map { field =>
96-
val fieldType = context.getFileSchema.asGroupType().getType(field.name)
97-
new MessageType("root", fieldType)
98-
}.reduce(_ union _)
99-
}.getOrElse(context.getFileSchema)
135+
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
136+
val toParquet = new CatalystSchemaConverter(conf)
137+
val fileSchema = context.getFileSchema.asGroupType()
138+
val fileFieldNames = fileSchema.getFields.map(_.getName).toSet
139+
140+
StructType
141+
// Deserializes the Catalyst schema of requested columns
142+
.fromString(schemaString)
143+
.map { field =>
144+
if (fileFieldNames.contains(field.name)) {
145+
// If the field exists in the target Parquet file, extracts the field type from the
146+
// full file schema and makes a single-field Parquet schema
147+
new MessageType("root", fileSchema.getType(field.name))
148+
} else {
149+
// Otherwise, just resorts to `CatalystSchemaConverter`
150+
toParquet.convert(StructType(Array(field)))
151+
}
152+
}
153+
// Merges all single-field Parquet schemas to form a complete schema for all requested
154+
// columns. Note that it's possible that no columns are requested at all (e.g., count
155+
// some partition column of a partitioned Parquet table). That's why `fold` is used here
156+
// and always fallback to an empty Parquet schema.
157+
.fold(new MessageType("root")) {
158+
_ union _
159+
}
160+
}
100161

101162
val metadata =
102163
Map.empty[String, String] ++
103164
maybeRequestedSchema.map(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
104165
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
105166

167+
logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
106168
new ReadContext(parquetRequestedSchema, metadata)
107169
}
108170
}

0 commit comments

Comments
 (0)