Skip to content

Commit f21d88e

Browse files
committed
More comments and test cases
1 parent 6f009a2 commit f21d88e

File tree

2 files changed

+110
-29
lines changed

2 files changed

+110
-29
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala

Lines changed: 69 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.util.{Map => JMap}
2121

22-
import scala.collection.JavaConverters._
22+
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter}
2323

2424
import org.apache.hadoop.conf.Configuration
2525
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
@@ -107,43 +107,67 @@ private[parquet] object CatalystReadSupport {
107107
* in `catalystSchema`, and adding those only exist in `catalystSchema`.
108108
*/
109109
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
110-
val clippedGroup = clipParquetType(parquetSchema.asGroupType(), catalystSchema).asGroupType()
111-
Types.buildMessage().addFields(clippedGroup.getFields.asScala: _*).named("root")
110+
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
111+
Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
112112
}
113113

114114
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
115115
catalystType match {
116116
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
117+
// Only clips array types with nested type as element type.
117118
clipParquetListType(parquetType.asGroupType(), t.elementType)
118119

119120
case t: MapType if !isPrimitiveCatalystType(t.valueType) =>
121+
// Only clips map types with nested type as value type.
120122
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
121123

122124
case t: StructType =>
123-
clipParquetRecord(parquetType.asGroupType(), t)
125+
clipParquetGroup(parquetType.asGroupType(), t)
124126

125127
case _ =>
126128
parquetType
127129
}
128130
}
129131

132+
/**
133+
* Whether a Catalyst [[DataType]] is primitive. Primitive [[DataType]] is not equivalent to
134+
* [[AtomicType]]. For example, [[CalendarIntervalType]] is primitive, but it's not an
135+
* [[AtomicType]].
136+
*/
130137
private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
131138
dataType match {
132139
case _: ArrayType | _: MapType | _: StructType => false
133140
case _ => true
134141
}
135142
}
136143

144+
/**
145+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]]. The element type
146+
* of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
147+
* [[StructType]].
148+
*/
137149
private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = {
150+
// Precondition of this method, should only be called for lists with nested element types.
138151
assert(!isPrimitiveCatalystType(elementType))
139152

140-
// Unannotated repeated group, list element type is just the group itself. Clip it.
153+
// Unannotated repeated group should be interpreted as required list of required element, so
154+
// list element type is just the group itself. Clip it.
141155
if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) {
142156
clipParquetType(parquetList, elementType)
143157
} else {
144-
assert(parquetList.getOriginalType == OriginalType.LIST)
145-
assert(parquetList.getFieldCount == 1)
146-
assert(parquetList.getType(0).isRepetition(Repetition.REPEATED))
158+
assert(
159+
parquetList.getOriginalType == OriginalType.LIST,
160+
"Invalid Parquet schema. " +
161+
"Original type of annotated Parquet lists must be LIST: " +
162+
parquetList.toString)
163+
164+
assert(
165+
parquetList.getFieldCount == 1 && parquetList.getType(0).isRepetition(Repetition.REPEATED),
166+
"Invalid Parquet schema. " +
167+
"LIST-annotated group should only have exactly one repeated field: " +
168+
parquetList)
169+
170+
// Precondition of this method, should only be called for lists with nested element types.
147171
assert(!parquetList.getType(0).isPrimitive)
148172

149173
val repeatedGroup = parquetList.getType(0).asGroupType()
@@ -179,8 +203,14 @@ private[parquet] object CatalystReadSupport {
179203
}
180204
}
181205

206+
/**
207+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. The value type
208+
* of the [[MapType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
209+
* [[StructType]]. Note that key type of any [[MapType]] is always a primitive type.
210+
*/
182211
private def clipParquetMapType(
183212
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
213+
// Precondition of this method, should only be called for maps with nested value types.
184214
assert(!isPrimitiveCatalystType(valueType))
185215

186216
val repeatedGroup = parquetMap.getType(0).asGroupType()
@@ -202,27 +232,37 @@ private[parquet] object CatalystReadSupport {
202232
.named(parquetMap.getName)
203233
}
204234

205-
private def clipParquetRecord(parquetRecord: GroupType, structType: StructType): GroupType = {
206-
val tailoredFields = {
207-
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
208-
val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
209-
structType.map { f =>
210-
parquetFieldMap
211-
.get(f.name)
212-
.map(clipParquetType(_, f.dataType))
213-
.getOrElse(toParquet.convertField(f))
214-
}
215-
}
235+
/**
236+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]].
237+
*
238+
* @return A clipped [[GroupType]], which has at least one field.
239+
* @note Parquet doesn't allow creating empty [[GroupType]] instances except for empty
240+
* [[MessageType]]. Because it's legal to construct an empty requested schema for column
241+
* pruning.
242+
*/
243+
private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = {
244+
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType)
245+
Types
246+
.buildGroup(parquetRecord.getRepetition)
247+
.as(parquetRecord.getOriginalType)
248+
.addFields(clippedParquetFields: _*)
249+
.named(parquetRecord.getName)
250+
}
216251

217-
// Here we can't use builder methods defined in `Types` to construct the `GroupType` and have to
218-
// resort to this deprecated constructor. The reason is that, `tailoredFields` can be empty,
219-
// and `Types` builder methods don't allow constructing empty group types. For example, query
220-
// `SELECT COUNT(1) FROM t` requests for zero columns.
221-
// TODO Refactor method signature to return a list of fields instead of a `GroupType`
222-
new GroupType(
223-
parquetRecord.getRepetition,
224-
parquetRecord.getName,
225-
parquetRecord.getOriginalType,
226-
tailoredFields.asJava)
252+
/**
253+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]].
254+
*
255+
* @return A list of clipped [[GroupType]] fields, which can be empty.
256+
*/
257+
private def clipParquetGroupFields(
258+
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
259+
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
260+
val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
261+
structType.map { f =>
262+
parquetFieldMap
263+
.get(f.name)
264+
.map(clipParquetType(_, f.dataType))
265+
.getOrElse(toParquet.convertField(f))
266+
}
227267
}
228268
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,6 +1195,47 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
11951195
|}
11961196
""".stripMargin)
11971197

1198+
testSchemaClipping(
1199+
"2-level list of required struct",
1200+
1201+
parquetSchema =
1202+
s"""message root {
1203+
| required group f0 {
1204+
| required group f00 (LIST) {
1205+
| repeated group element {
1206+
| required int32 f000;
1207+
| optional int64 f001;
1208+
| }
1209+
| }
1210+
| }
1211+
|}
1212+
""".stripMargin,
1213+
1214+
catalystSchema = {
1215+
val f00ElementType =
1216+
new StructType()
1217+
.add("f001", LongType, nullable = true)
1218+
.add("f002", DoubleType, nullable = false)
1219+
1220+
val f00Type = ArrayType(f00ElementType, containsNull = false)
1221+
val f0Type = new StructType().add("f00", f00Type, nullable = false)
1222+
1223+
new StructType().add("f0", f0Type, nullable = false)
1224+
},
1225+
1226+
expectedSchema =
1227+
s"""message root {
1228+
| required group f0 {
1229+
| required group f00 (LIST) {
1230+
| repeated group element {
1231+
| optional int64 f001;
1232+
| required double f002;
1233+
| }
1234+
| }
1235+
| }
1236+
|}
1237+
""".stripMargin)
1238+
11981239
testSchemaClipping(
11991240
"empty requested schema",
12001241

0 commit comments

Comments
 (0)