Skip to content

Commit 391e6be

Browse files
committed
[SPARK-10301] [SQL] Fixes schema merging for nested structs
This PR can be quite challenging to review. I'm trying to give a detailed description of the problem as well as its solution here. When reading Parquet files, we need to specify a potentially nested Parquet schema (of type `MessageType`) as requested schema for column pruning. This Parquet schema is translated from a Catalyst schema (of type `StructType`), which is generated by the query planner and represents all requested columns. However, this translation can be fairly complicated because of several reasons: 1. Requested schema must conform to the real schema of the physical file to be read. This means we have to tailor the actual file schema of every individual physical Parquet file to be read according to the given Catalyst schema. Fortunately we are already doing this in Spark 1.5 by pushing request schema conversion to executor side in PR #7231. 1. Support for schema merging. A single Parquet dataset may consist of multiple physical Parquet files come with different but compatible schemas. This means we may request for a column path that doesn't exist in a physical Parquet file. All requested column paths can be nested. For example, for a Parquet file schema ``` message root { required group f0 { required group f00 { required int32 f000; required binary f001 (UTF8); } } } ``` we may request for column paths defined in the following schema: ``` message root { required group f0 { required group f00 { required binary f001 (UTF8); required float f002; } } optional double f1; } ``` Notice that we pruned column path `f0.f00.f000`, but added `f0.f00.f002` and `f1`. The good news is that Parquet handles non-existing column paths properly and always returns null for them. 1. The map from `StructType` to `MessageType` is a one-to-many map. This is the most unfortunate part. Due to historical reasons (dark histories!), schemas of Parquet files generated by different libraries have different "flavors". For example, to handle a schema with a single non-nullable column, whose type is an array of non-nullable integers, parquet-protobuf generates the following Parquet schema: ``` message m0 { repeated int32 f; } ``` while parquet-avro generates another version: ``` message m1 { required group f (LIST) { repeated int32 array; } } ``` and parquet-thrift spills this: ``` message m1 { required group f (LIST) { repeated int32 f_tuple; } } ``` All of them can be mapped to the following _unique_ Catalyst schema: ``` StructType( StructField( "f", ArrayType(IntegerType, containsNull = false), nullable = false)) ``` This greatly complicates Parquet requested schema construction, since the path of a given column varies in different cases. To read the array elements from files with the above schemas, we must use `f` for `m0`, `f.array` for `m1`, and `f.f_tuple` for `m2`. In earlier Spark versions, we didn't try to fix this issue properly. Spark 1.4 and prior versions simply translate the Catalyst schema in a way more or less compatible with parquet-hive and parquet-avro, but is broken in many other cases. Earlier revisions of Spark 1.5 only try to tailor the Parquet file schema at the first level, and ignore nested ones. This caused [SPARK-10301] [spark-10301] as well as [SPARK-10005] [spark-10005]. In PR #8228, I tried to avoid the hard part of the problem and made a minimum change in `CatalystRowConverter` to fix SPARK-10005. However, when taking SPARK-10301 into consideration, keeping hacking `CatalystRowConverter` doesn't seem to be a good idea. So this PR is an attempt to fix the problem in a proper way. For a given physical Parquet file with schema `ps` and a compatible Catalyst requested schema `cs`, we use the following algorithm to tailor `ps` to get the result Parquet requested schema `ps'`: For a leaf column path `c` in `cs`: - if `c` exists in `cs` and a corresponding Parquet column path `c'` can be found in `ps`, `c'` should be included in `ps'`; - otherwise, we convert `c` to a Parquet column path `c"` using `CatalystSchemaConverter`, and include `c"` in `ps'`; - no other column paths should exist in `ps'`. Then comes the most tedious part: > Given `cs`, `ps`, and `c`, how to locate `c'` in `ps`? Unfortunately, there's no quick answer, and we have to enumerate all possible structures defined in parquet-format spec. They are: 1. the standard structure of nested types, and 1. cases defined in all backwards-compatibility rules for `LIST` and `MAP`. The core part of this PR is `CatalystReadSupport.clipParquetType()`, which tailors a given Parquet file schema according to a requested schema in its Catalyst form. Backwards-compatibility rules of `LIST` and `MAP` are covered in `clipParquetListType()` and `clipParquetMapType()` respectively. The column path selection algorithm is implemented in `clipParquetGroupFields()`. With this PR, we no longer need to do schema tailoring in `CatalystReadSupport` and `CatalystRowConverter`. Another benefit is that, now we can also read Parquet datasets consist of files with different physical Parquet schema but share the same logical schema, for example, files generated by different Parquet libraries. This situation is illustrated by [this test case] [test-case]. [spark-10301]: https://issues.apache.org/jira/browse/SPARK-10301 [spark-10005]: https://issues.apache.org/jira/browse/SPARK-10005 [test-case]: liancheng@38644d8#diff-a9b98e28ce3ae30641829dffd1173be2R26 Author: Cheng Lian <[email protected]> Closes #8509 from liancheng/spark-10301/fix-parquet-requested-schema.
1 parent d65656c commit 391e6be

File tree

7 files changed

+653
-125
lines changed

7 files changed

+653
-125
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala

Lines changed: 170 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@ package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.util.{Map => JMap}
2121

22-
import scala.collection.JavaConverters._
22+
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter}
2323

2424
import org.apache.hadoop.conf.Configuration
2525
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
2626
import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
2727
import org.apache.parquet.io.api.RecordMaterializer
28-
import org.apache.parquet.schema.MessageType
28+
import org.apache.parquet.schema.Type.Repetition
29+
import org.apache.parquet.schema._
2930

3031
import org.apache.spark.Logging
3132
import org.apache.spark.sql.catalyst.InternalRow
32-
import org.apache.spark.sql.types.StructType
33+
import org.apache.spark.sql.types._
3334

3435
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
3536
// Called after `init()` when initializing Parquet record reader.
@@ -81,70 +82,10 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
8182
// `StructType` containing all requested columns.
8283
val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
8384

84-
// Below we construct a Parquet schema containing all requested columns. This schema tells
85-
// Parquet which columns to read.
86-
//
87-
// If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
88-
// we have to fallback to the full file schema which contains all columns in the file.
89-
// Obviously this may waste IO bandwidth since it may read more columns than requested.
90-
//
91-
// Two things to note:
92-
//
93-
// 1. It's possible that some requested columns don't exist in the target Parquet file. For
94-
// example, in the case of schema merging, the globally merged schema may contain extra
95-
// columns gathered from other Parquet files. These columns will be simply filled with nulls
96-
// when actually reading the target Parquet file.
97-
//
98-
// 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
99-
// Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
100-
// non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
101-
// containing a single integer array field `f1` may have the following legacy 2-level
102-
// structure:
103-
//
104-
// message root {
105-
// optional group f1 (LIST) {
106-
// required INT32 element;
107-
// }
108-
// }
109-
//
110-
// while `CatalystSchemaConverter` may generate a standard 3-level structure:
111-
//
112-
// message root {
113-
// optional group f1 (LIST) {
114-
// repeated group list {
115-
// required INT32 element;
116-
// }
117-
// }
118-
// }
119-
//
120-
// Apparently, we can't use the 2nd schema to read the target Parquet file as they have
121-
// different physical structures.
12285
val parquetRequestedSchema =
12386
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
124-
val toParquet = new CatalystSchemaConverter(conf)
125-
val fileSchema = context.getFileSchema.asGroupType()
126-
val fileFieldNames = fileSchema.getFields.asScala.map(_.getName).toSet
127-
128-
StructType
129-
// Deserializes the Catalyst schema of requested columns
130-
.fromString(schemaString)
131-
.map { field =>
132-
if (fileFieldNames.contains(field.name)) {
133-
// If the field exists in the target Parquet file, extracts the field type from the
134-
// full file schema and makes a single-field Parquet schema
135-
new MessageType("root", fileSchema.getType(field.name))
136-
} else {
137-
// Otherwise, just resorts to `CatalystSchemaConverter`
138-
toParquet.convert(StructType(Array(field)))
139-
}
140-
}
141-
// Merges all single-field Parquet schemas to form a complete schema for all requested
142-
// columns. Note that it's possible that no columns are requested at all (e.g., count
143-
// some partition column of a partitioned Parquet table). That's why `fold` is used here
144-
// and always fallback to an empty Parquet schema.
145-
.fold(new MessageType("root")) {
146-
_ union _
147-
}
87+
val catalystRequestedSchema = StructType.fromString(schemaString)
88+
CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
14889
}
14990

15091
val metadata =
@@ -160,4 +101,168 @@ private[parquet] object CatalystReadSupport {
160101
val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
161102

162103
val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
104+
105+
/**
106+
* Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist
107+
* in `catalystSchema`, and adding those only exist in `catalystSchema`.
108+
*/
109+
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
110+
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
111+
Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
112+
}
113+
114+
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
115+
catalystType match {
116+
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
117+
// Only clips array types with nested type as element type.
118+
clipParquetListType(parquetType.asGroupType(), t.elementType)
119+
120+
case t: MapType if !isPrimitiveCatalystType(t.valueType) =>
121+
// Only clips map types with nested type as value type.
122+
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
123+
124+
case t: StructType =>
125+
clipParquetGroup(parquetType.asGroupType(), t)
126+
127+
case _ =>
128+
parquetType
129+
}
130+
}
131+
132+
/**
133+
* Whether a Catalyst [[DataType]] is primitive. Primitive [[DataType]] is not equivalent to
134+
* [[AtomicType]]. For example, [[CalendarIntervalType]] is primitive, but it's not an
135+
* [[AtomicType]].
136+
*/
137+
private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
138+
dataType match {
139+
case _: ArrayType | _: MapType | _: StructType => false
140+
case _ => true
141+
}
142+
}
143+
144+
/**
145+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]]. The element type
146+
* of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
147+
* [[StructType]].
148+
*/
149+
private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = {
150+
// Precondition of this method, should only be called for lists with nested element types.
151+
assert(!isPrimitiveCatalystType(elementType))
152+
153+
// Unannotated repeated group should be interpreted as required list of required element, so
154+
// list element type is just the group itself. Clip it.
155+
if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) {
156+
clipParquetType(parquetList, elementType)
157+
} else {
158+
assert(
159+
parquetList.getOriginalType == OriginalType.LIST,
160+
"Invalid Parquet schema. " +
161+
"Original type of annotated Parquet lists must be LIST: " +
162+
parquetList.toString)
163+
164+
assert(
165+
parquetList.getFieldCount == 1 && parquetList.getType(0).isRepetition(Repetition.REPEATED),
166+
"Invalid Parquet schema. " +
167+
"LIST-annotated group should only have exactly one repeated field: " +
168+
parquetList)
169+
170+
// Precondition of this method, should only be called for lists with nested element types.
171+
assert(!parquetList.getType(0).isPrimitive)
172+
173+
val repeatedGroup = parquetList.getType(0).asGroupType()
174+
175+
// If the repeated field is a group with multiple fields, or the repeated field is a group
176+
// with one field and is named either "array" or uses the LIST-annotated group's name with
177+
// "_tuple" appended then the repeated type is the element type and elements are required.
178+
// Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the
179+
// only field.
180+
if (
181+
repeatedGroup.getFieldCount > 1 ||
182+
repeatedGroup.getName == "array" ||
183+
repeatedGroup.getName == parquetList.getName + "_tuple"
184+
) {
185+
Types
186+
.buildGroup(parquetList.getRepetition)
187+
.as(OriginalType.LIST)
188+
.addField(clipParquetType(repeatedGroup, elementType))
189+
.named(parquetList.getName)
190+
} else {
191+
// Otherwise, the repeated field's type is the element type with the repeated field's
192+
// repetition.
193+
Types
194+
.buildGroup(parquetList.getRepetition)
195+
.as(OriginalType.LIST)
196+
.addField(
197+
Types
198+
.repeatedGroup()
199+
.addField(clipParquetType(repeatedGroup.getType(0), elementType))
200+
.named(repeatedGroup.getName))
201+
.named(parquetList.getName)
202+
}
203+
}
204+
}
205+
206+
/**
207+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. The value type
208+
* of the [[MapType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
209+
* [[StructType]]. Note that key type of any [[MapType]] is always a primitive type.
210+
*/
211+
private def clipParquetMapType(
212+
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
213+
// Precondition of this method, should only be called for maps with nested value types.
214+
assert(!isPrimitiveCatalystType(valueType))
215+
216+
val repeatedGroup = parquetMap.getType(0).asGroupType()
217+
val parquetKeyType = repeatedGroup.getType(0)
218+
val parquetValueType = repeatedGroup.getType(1)
219+
220+
val clippedRepeatedGroup =
221+
Types
222+
.repeatedGroup()
223+
.as(repeatedGroup.getOriginalType)
224+
.addField(parquetKeyType)
225+
.addField(clipParquetType(parquetValueType, valueType))
226+
.named(repeatedGroup.getName)
227+
228+
Types
229+
.buildGroup(parquetMap.getRepetition)
230+
.as(parquetMap.getOriginalType)
231+
.addField(clippedRepeatedGroup)
232+
.named(parquetMap.getName)
233+
}
234+
235+
/**
236+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]].
237+
*
238+
* @return A clipped [[GroupType]], which has at least one field.
239+
* @note Parquet doesn't allow creating empty [[GroupType]] instances except for empty
240+
* [[MessageType]]. Because it's legal to construct an empty requested schema for column
241+
* pruning.
242+
*/
243+
private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = {
244+
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType)
245+
Types
246+
.buildGroup(parquetRecord.getRepetition)
247+
.as(parquetRecord.getOriginalType)
248+
.addFields(clippedParquetFields: _*)
249+
.named(parquetRecord.getName)
250+
}
251+
252+
/**
253+
* Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]].
254+
*
255+
* @return A list of clipped [[GroupType]] fields, which can be empty.
256+
*/
257+
private def clipParquetGroupFields(
258+
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
259+
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
260+
val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
261+
structType.map { f =>
262+
parquetFieldMap
263+
.get(f.name)
264+
.map(clipParquetType(_, f.dataType))
265+
.getOrElse(toParquet.convertField(f))
266+
}
267+
}
163268
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -113,31 +113,6 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
113113
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
114114
* any "parent" container.
115115
*
116-
* @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the
117-
* Parquet file being read, while constructor argument [[catalystType]] refers to requested
118-
* fields of the global schema. The key difference is that, in case of schema merging,
119-
* [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have
120-
* the following [[catalystType]]:
121-
* {{{
122-
* new StructType()
123-
* .add("f1", IntegerType, nullable = false)
124-
* .add("f2", StringType, nullable = true)
125-
* .add("f3", new StructType()
126-
* .add("f31", DoubleType, nullable = false)
127-
* .add("f32", IntegerType, nullable = true)
128-
* .add("f33", StringType, nullable = true), nullable = false)
129-
* }}}
130-
* and the following [[parquetType]] (`f2` and `f32` are missing):
131-
* {{{
132-
* message root {
133-
* required int32 f1;
134-
* required group f3 {
135-
* required double f31;
136-
* optional binary f33 (utf8);
137-
* }
138-
* }
139-
* }}}
140-
*
141116
* @param parquetType Parquet schema of Parquet records
142117
* @param catalystType Spark SQL schema that corresponds to the Parquet record type
143118
* @param updater An updater which propagates converted field values to the parent container
@@ -179,31 +154,7 @@ private[parquet] class CatalystRowConverter(
179154

180155
// Converters for each field.
181156
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
182-
// In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad
183-
// those missing fields and create converters for them, although values of these fields are
184-
// always null.
185-
val paddedParquetFields = {
186-
val parquetFields = parquetType.getFields.asScala
187-
val parquetFieldNames = parquetFields.map(_.getName).toSet
188-
val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name))
189-
190-
// We don't need to worry about feature flag arguments like `assumeBinaryIsString` when
191-
// creating the schema converter here, since values of missing fields are always null.
192-
val toParquet = new CatalystSchemaConverter()
193-
194-
(parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f =>
195-
catalystType.indexWhere(_.name == f.getName)
196-
}
197-
}
198-
199-
if (paddedParquetFields.length != catalystType.length) {
200-
throw new UnsupportedOperationException(
201-
"A Parquet file's schema has different number of fields with the table schema. " +
202-
"Please enable schema merging by setting \"mergeSchema\" to true when load " +
203-
"a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.")
204-
}
205-
206-
paddedParquetFields.zip(catalystType).zipWithIndex.map {
157+
parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
207158
case ((parquetFieldType, catalystField), ordinal) =>
208159
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
209160
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,10 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
5555
* to old style non-standard behaviors.
5656
*/
5757
private[parquet] class CatalystSchemaConverter(
58-
private val assumeBinaryIsString: Boolean,
59-
private val assumeInt96IsTimestamp: Boolean,
60-
private val followParquetFormatSpec: Boolean) {
61-
62-
// Only used when constructing converter for converting Spark SQL schema to Parquet schema, in
63-
// which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant.
64-
def this() = this(
65-
assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
66-
assumeInt96IsTimestamp = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
67-
followParquetFormatSpec = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)
58+
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
59+
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
60+
followParquetFormatSpec: Boolean = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get
61+
) {
6862

6963
def this(conf: SQLConf) = this(
7064
assumeBinaryIsString = conf.isParquetBinaryAsString,

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20+
import java.io.File
2021
import java.nio.ByteBuffer
2122
import java.util.{List => JList, Map => JMap}
2223

0 commit comments

Comments
 (0)