-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-10301] [SQL] Fixes schema merging for nested structs #8509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fb4d67d
b6f4526
6f009a2
f21d88e
38644d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,17 +19,18 @@ package org.apache.spark.sql.execution.datasources.parquet | |
|
|
||
| import java.util.{Map => JMap} | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter} | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.parquet.hadoop.api.ReadSupport.ReadContext | ||
| import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} | ||
| import org.apache.parquet.io.api.RecordMaterializer | ||
| import org.apache.parquet.schema.MessageType | ||
| import org.apache.parquet.schema.Type.Repetition | ||
| import org.apache.parquet.schema._ | ||
|
|
||
| import org.apache.spark.Logging | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
| private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { | ||
| // Called after `init()` when initializing Parquet record reader. | ||
|
|
@@ -81,70 +82,10 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with | |
| // `StructType` containing all requested columns. | ||
| val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) | ||
|
|
||
| // Below we construct a Parquet schema containing all requested columns. This schema tells | ||
| // Parquet which columns to read. | ||
| // | ||
| // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise, | ||
| // we have to fallback to the full file schema which contains all columns in the file. | ||
| // Obviously this may waste IO bandwidth since it may read more columns than requested. | ||
| // | ||
| // Two things to note: | ||
| // | ||
| // 1. It's possible that some requested columns don't exist in the target Parquet file. For | ||
| // example, in the case of schema merging, the globally merged schema may contain extra | ||
| // columns gathered from other Parquet files. These columns will be simply filled with nulls | ||
| // when actually reading the target Parquet file. | ||
| // | ||
| // 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to | ||
| // Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to | ||
| // non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file | ||
| // containing a single integer array field `f1` may have the following legacy 2-level | ||
| // structure: | ||
| // | ||
| // message root { | ||
| // optional group f1 (LIST) { | ||
| // required INT32 element; | ||
| // } | ||
| // } | ||
| // | ||
| // while `CatalystSchemaConverter` may generate a standard 3-level structure: | ||
| // | ||
| // message root { | ||
| // optional group f1 (LIST) { | ||
| // repeated group list { | ||
| // required INT32 element; | ||
| // } | ||
| // } | ||
| // } | ||
| // | ||
| // Apparently, we can't use the 2nd schema to read the target Parquet file as they have | ||
| // different physical structures. | ||
| val parquetRequestedSchema = | ||
| maybeRequestedSchema.fold(context.getFileSchema) { schemaString => | ||
| val toParquet = new CatalystSchemaConverter(conf) | ||
| val fileSchema = context.getFileSchema.asGroupType() | ||
| val fileFieldNames = fileSchema.getFields.asScala.map(_.getName).toSet | ||
|
|
||
| StructType | ||
| // Deserializes the Catalyst schema of requested columns | ||
| .fromString(schemaString) | ||
| .map { field => | ||
| if (fileFieldNames.contains(field.name)) { | ||
| // If the field exists in the target Parquet file, extracts the field type from the | ||
| // full file schema and makes a single-field Parquet schema | ||
| new MessageType("root", fileSchema.getType(field.name)) | ||
| } else { | ||
| // Otherwise, just resorts to `CatalystSchemaConverter` | ||
| toParquet.convert(StructType(Array(field))) | ||
| } | ||
| } | ||
| // Merges all single-field Parquet schemas to form a complete schema for all requested | ||
| // columns. Note that it's possible that no columns are requested at all (e.g., count | ||
| // some partition column of a partitioned Parquet table). That's why `fold` is used here | ||
| // and always fallback to an empty Parquet schema. | ||
| .fold(new MessageType("root")) { | ||
| _ union _ | ||
| } | ||
| val catalystRequestedSchema = StructType.fromString(schemaString) | ||
| CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) | ||
| } | ||
|
|
||
| val metadata = | ||
|
|
@@ -160,4 +101,168 @@ private[parquet] object CatalystReadSupport { | |
| val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" | ||
|
|
||
| val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" | ||
|
|
||
| /** | ||
| * Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist | ||
| * in `catalystSchema`, and adding those only exist in `catalystSchema`. | ||
| */ | ||
| def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { | ||
| val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) | ||
| Types.buildMessage().addFields(clippedParquetFields: _*).named("root") | ||
| } | ||
|
|
||
| private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { | ||
| catalystType match { | ||
| case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => | ||
| // Only clips array types with nested type as element type. | ||
| clipParquetListType(parquetType.asGroupType(), t.elementType) | ||
|
|
||
| case t: MapType if !isPrimitiveCatalystType(t.valueType) => | ||
| // Only clips map types with nested type as value type. | ||
| clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType) | ||
|
|
||
| case t: StructType => | ||
| clipParquetGroup(parquetType.asGroupType(), t) | ||
|
|
||
| case _ => | ||
| parquetType | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Whether a Catalyst [[DataType]] is primitive. Primitive [[DataType]] is not equivalent to | ||
| * [[AtomicType]]. For example, [[CalendarIntervalType]] is primitive, but it's not an | ||
| * [[AtomicType]]. | ||
| */ | ||
| private def isPrimitiveCatalystType(dataType: DataType): Boolean = { | ||
| dataType match { | ||
| case _: ArrayType | _: MapType | _: StructType => false | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about UDT?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like for a UDT, we need to call
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After chatted with @liancheng offline, we should not handle UDT here (leave it as it's).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then, let's have a comment at here to explain the reason. |
||
| case _ => true | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]]. The element type | ||
| * of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a | ||
| * [[StructType]]. | ||
| */ | ||
| private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = { | ||
| // Precondition of this method, should only be called for lists with nested element types. | ||
| assert(!isPrimitiveCatalystType(elementType)) | ||
|
|
||
| // Unannotated repeated group should be interpreted as required list of required element, so | ||
| // list element type is just the group itself. Clip it. | ||
| if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { | ||
| clipParquetType(parquetList, elementType) | ||
| } else { | ||
| assert( | ||
| parquetList.getOriginalType == OriginalType.LIST, | ||
| "Invalid Parquet schema. " + | ||
| "Original type of annotated Parquet lists must be LIST: " + | ||
| parquetList.toString) | ||
|
|
||
| assert( | ||
| parquetList.getFieldCount == 1 && parquetList.getType(0).isRepetition(Repetition.REPEATED), | ||
| "Invalid Parquet schema. " + | ||
| "LIST-annotated group should only have exactly one repeated field: " + | ||
| parquetList) | ||
|
|
||
| // Precondition of this method, should only be called for lists with nested element types. | ||
| assert(!parquetList.getType(0).isPrimitive) | ||
|
|
||
| val repeatedGroup = parquetList.getType(0).asGroupType() | ||
|
|
||
| // If the repeated field is a group with multiple fields, or the repeated field is a group | ||
| // with one field and is named either "array" or uses the LIST-annotated group's name with | ||
| // "_tuple" appended then the repeated type is the element type and elements are required. | ||
| // Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the | ||
| // only field. | ||
| if ( | ||
| repeatedGroup.getFieldCount > 1 || | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this case?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This case corresponds to the 2nd rule of LIST backwards-compatibility rules defined here: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually this method is a direct mapping of LIST backwards-compatibility rules defined in the link above. But list of primitive types is not handled in this method, since we only care about complex element type. |
||
| repeatedGroup.getName == "array" || | ||
| repeatedGroup.getName == parquetList.getName + "_tuple" | ||
| ) { | ||
| Types | ||
| .buildGroup(parquetList.getRepetition) | ||
| .as(OriginalType.LIST) | ||
| .addField(clipParquetType(repeatedGroup, elementType)) | ||
| .named(parquetList.getName) | ||
| } else { | ||
| // Otherwise, the repeated field's type is the element type with the repeated field's | ||
| // repetition. | ||
| Types | ||
| .buildGroup(parquetList.getRepetition) | ||
| .as(OriginalType.LIST) | ||
| .addField( | ||
| Types | ||
| .repeatedGroup() | ||
| .addField(clipParquetType(repeatedGroup.getType(0), elementType)) | ||
| .named(repeatedGroup.getName)) | ||
| .named(parquetList.getName) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]]. The value type | ||
| * of the [[MapType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a | ||
| * [[StructType]]. Note that key type of any [[MapType]] is always a primitive type. | ||
| */ | ||
| private def clipParquetMapType( | ||
| parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = { | ||
| // Precondition of this method, should only be called for maps with nested value types. | ||
| assert(!isPrimitiveCatalystType(valueType)) | ||
|
|
||
| val repeatedGroup = parquetMap.getType(0).asGroupType() | ||
| val parquetKeyType = repeatedGroup.getType(0) | ||
| val parquetValueType = repeatedGroup.getType(1) | ||
|
|
||
| val clippedRepeatedGroup = | ||
| Types | ||
| .repeatedGroup() | ||
| .as(repeatedGroup.getOriginalType) | ||
| .addField(parquetKeyType) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't allow key type to be complex type in Spark SQL. This is consistent with Hive.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, although complex map keys are not allowed while using HiveQL in Spark SQL, they are allowed otherwise, and we can read/write them from/to Parquet successfully. So we do need to handle complex map key here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added support for this in #8583. |
||
| .addField(clipParquetType(parquetValueType, valueType)) | ||
| .named(repeatedGroup.getName) | ||
|
|
||
| Types | ||
| .buildGroup(parquetMap.getRepetition) | ||
| .as(parquetMap.getOriginalType) | ||
| .addField(clippedRepeatedGroup) | ||
| .named(parquetMap.getName) | ||
| } | ||
|
|
||
| /** | ||
| * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]]. | ||
| * | ||
| * @return A clipped [[GroupType]], which has at least one field. | ||
| * @note Parquet doesn't allow creating empty [[GroupType]] instances except for empty | ||
| * [[MessageType]]. Because it's legal to construct an empty requested schema for column | ||
| * pruning. | ||
| */ | ||
| private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = { | ||
| val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType) | ||
| Types | ||
| .buildGroup(parquetRecord.getRepetition) | ||
| .as(parquetRecord.getOriginalType) | ||
| .addFields(clippedParquetFields: _*) | ||
| .named(parquetRecord.getName) | ||
| } | ||
|
|
||
| /** | ||
| * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]]. | ||
| * | ||
| * @return A list of clipped [[GroupType]] fields, which can be empty. | ||
| */ | ||
| private def clipParquetGroupFields( | ||
| parquetRecord: GroupType, structType: StructType): Seq[Type] = { | ||
| val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap | ||
| val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true) | ||
| structType.map { f => | ||
| parquetFieldMap | ||
| .get(f.name) | ||
| .map(clipParquetType(_, f.dataType)) | ||
| .getOrElse(toParquet.convertField(f)) | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -113,31 +113,6 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp | |
| * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have | ||
| * any "parent" container. | ||
| * | ||
| * @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the | ||
| * Parquet file being read, while constructor argument [[catalystType]] refers to requested | ||
| * fields of the global schema. The key difference is that, in case of schema merging, | ||
| * [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have | ||
| * the following [[catalystType]]: | ||
| * {{{ | ||
| * new StructType() | ||
| * .add("f1", IntegerType, nullable = false) | ||
| * .add("f2", StringType, nullable = true) | ||
| * .add("f3", new StructType() | ||
| * .add("f31", DoubleType, nullable = false) | ||
| * .add("f32", IntegerType, nullable = true) | ||
| * .add("f33", StringType, nullable = true), nullable = false) | ||
| * }}} | ||
| * and the following [[parquetType]] (`f2` and `f32` are missing): | ||
| * {{{ | ||
| * message root { | ||
| * required int32 f1; | ||
| * required group f3 { | ||
| * required double f31; | ||
| * optional binary f33 (utf8); | ||
| * } | ||
| * } | ||
| * }}} | ||
| * | ||
| * @param parquetType Parquet schema of Parquet records | ||
| * @param catalystType Spark SQL schema that corresponds to the Parquet record type | ||
| * @param updater An updater which propagates converted field values to the parent container | ||
|
|
@@ -179,31 +154,7 @@ private[parquet] class CatalystRowConverter( | |
|
|
||
| // Converters for each field. | ||
| private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { | ||
| // In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad | ||
| // those missing fields and create converters for them, although values of these fields are | ||
| // always null. | ||
| val paddedParquetFields = { | ||
| val parquetFields = parquetType.getFields.asScala | ||
| val parquetFieldNames = parquetFields.map(_.getName).toSet | ||
| val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name)) | ||
|
|
||
| // We don't need to worry about feature flag arguments like `assumeBinaryIsString` when | ||
| // creating the schema converter here, since values of missing fields are always null. | ||
| val toParquet = new CatalystSchemaConverter() | ||
|
|
||
| (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f => | ||
| catalystType.indexWhere(_.name == f.getName) | ||
| } | ||
| } | ||
|
|
||
| if (paddedParquetFields.length != catalystType.length) { | ||
| throw new UnsupportedOperationException( | ||
| "A Parquet file's schema has different number of fields with the table schema. " + | ||
| "Please enable schema merging by setting \"mergeSchema\" to true when load " + | ||
| "a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.") | ||
| } | ||
|
|
||
| paddedParquetFields.zip(catalystType).zipWithIndex.map { | ||
| parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, should add an assertion. |
||
| case ((parquetFieldType, catalystField), ordinal) => | ||
| // Converted field value should be set to the `ordinal`-th cell of `currentRow` | ||
| newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add an assert here to make sure
parquetTypematchescatalystType?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first I thought it would be too complicated to add this assertion here since there can be multiple Parquet representation for a single Catalyst type, and some of them may even conflict with each other. But I just realized that we can simply resort to
CatalystSchemaConverterto convertparquetTypeto a Catalyst type and see whether the result matchescatalystType. This is because the mapping from Catalyst type to Parquet type is a one-to-many mapping.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found adding this assertion is still a pretty big change. Since it's only defensive and doesn't affect correctness, I'd like to have this one in a separate PR.