@@ -63,13 +63,17 @@ private[parquet] object ParquetTypesConverter {
6363 * Note that we apply the following conversion rules:
6464 * <ul>
6565 * <li> Primitive types are converter to the corresponding primitive type.</li>
66- * <li> Group types that have a single field with repetition `REPEATED` or themselves
67- * have repetition level `REPEATED` are converted to an [[ArrayType ]] with the
68- * corresponding field type (possibly primitive) as element type.</li>
66+ * <li> Group types that have a single field that is itself a group, which has repetition
67+ * level `REPEATED` and two fields (named `key` and `value`), are converted to
68+ * a [[MapType ]] with the corresponding key and value (value possibly complex)
69+ * as element type.</li>
6970 * <li> Other group types are converted as follows:<ul>
70- * <li> If they have a single field, they are converted into a [[StructType ]] with
71+ * <li> Group types that have a single field with repetition `REPEATED` or themselves
72+ * have repetition level `REPEATED` are converted to an [[ArrayType ]] with the
73+ * corresponding field type (possibly primitive) as element type.</li>
74+ * <li> Other groups with a single field are converted into a [[StructType ]] with
7175 * the corresponding field type.</li>
72- * <li> If they have more than one field and repetition level `REPEATED` they are
76+ * <li> If groups have more than one field and repetition level `REPEATED` they are
7377 * converted into an [[ArrayType ]] with the corresponding [[StructType ]] as complex
7478 * element type.</li>
7579 * <li> Otherwise they are converted into a [[StructType ]] with the corresponding
@@ -82,16 +86,33 @@ private[parquet] object ParquetTypesConverter {
8286 * @return The corresponding Catalyst type.
8387 */
8488 def toDataType (parquetType : ParquetType ): DataType = {
89+ def correspondsToMap (groupType : ParquetGroupType ): Boolean = {
90+ if (groupType.getFieldCount != 1 || groupType.getFields.apply(0 ).isPrimitive) {
91+ false
92+ } else {
93+ // This mostly follows the convention in ``parquet.schema.ConversionPatterns``
94+ val keyValueGroup = groupType.getFields.apply(0 ).asGroupType()
95+ keyValueGroup.getRepetition == Repetition .REPEATED &&
96+ keyValueGroup.getName == " map" &&
97+ keyValueGroup.getFields.apply(0 ).getName == " key" &&
98+ keyValueGroup.getFields.apply(1 ).getName == " value"
99+ }
100+ }
101+ def correspondsToArray (groupType : ParquetGroupType ): Boolean = {
102+ groupType.getFieldCount == 1 &&
103+ (groupType.getFields.apply(0 ).getRepetition == Repetition .REPEATED ||
104+ groupType.getRepetition == Repetition .REPEATED )
105+ }
106+
85107 if (parquetType.isPrimitive) {
86108 toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName)
87- }
88- else {
109+ } else {
89110 val groupType = parquetType.asGroupType()
90111 parquetType.getOriginalType match {
91112 // if the schema was constructed programmatically there may be hints how to convert
92113 // it inside the metadata via the OriginalType field
93114 case ParquetOriginalType .LIST => { // TODO: check enums!
94- val fields = groupType.getFields.map {
115+ val fields = groupType.getFields.map {
95116 field => new StructField (
96117 field.getName,
97118 toDataType(field),
@@ -103,16 +124,29 @@ private[parquet] object ParquetTypesConverter {
103124 new ArrayType (StructType (fields))
104125 }
105126 }
127+ case ParquetOriginalType .MAP => {
128+ assert(
129+ ! groupType.getFields.apply(0 ).isPrimitive,
130+ " Parquet Map type malformatted: expected nested group for map!" )
131+ val keyValueGroup = groupType.getFields.apply(0 ).asGroupType()
132+ assert(
133+ keyValueGroup.getFieldCount == 2 ,
134+ " Parquet Map type malformatted: nested group should have 2 (key, value) fields!" )
135+ val keyType = toDataType(keyValueGroup.getFields.apply(0 ))
136+ val valueType = toDataType(keyValueGroup.getFields.apply(1 ))
137+ new MapType (keyType, valueType)
138+ }
106139 case _ => {
107- // everything else nested becomes a Struct, unless it has a single repeated field
108- // in which case it becomes an array (this should correspond to the inverse operation of
109- // parquet.schema.ConversionPatterns.listType)
110- if (groupType.getFieldCount == 1 &&
111- (groupType.getFields.apply(0 ).getRepetition == Repetition .REPEATED ||
112- groupType.getRepetition == Repetition .REPEATED )) {
140+ // Note: the order of these checks is important!
141+ if (correspondsToMap(groupType)) { // MapType
142+ val keyValueGroup = groupType.getFields.apply(0 ).asGroupType()
143+ val keyType = toDataType(keyValueGroup.getFields.apply(0 ))
144+ val valueType = toDataType(keyValueGroup.getFields.apply(1 ))
145+ new MapType (keyType, valueType)
146+ } else if (correspondsToArray(groupType)) { // ArrayType
113147 val elementType = toDataType(groupType.getFields.apply(0 ))
114148 new ArrayType (elementType)
115- } else {
149+ } else { // everything else: StructType
116150 val fields = groupType
117151 .getFields
118152 .map(ptype => new StructField (
@@ -164,7 +198,10 @@ private[parquet] object ParquetTypesConverter {
164198 * <ul>
165199 * <li> Primitive types are converted into Parquet's primitive types.</li>
166200 * <li> [[org.apache.spark.sql.catalyst.types.StructType ]]s are converted
167- * into Parquet's `GroupType` with the corresponding field types.</li>
201+ * into Parquet's `GroupType` with the corresponding field types.</li>
202+ * <li> [[org.apache.spark.sql.catalyst.types.MapType ]]s are converted
203+ * into a nested (2-level) Parquet `GroupType` with two fields: a key type and
204+ * a value type. The nested group has repetition level `REPEATED`.</li>
168205 * <li> [[org.apache.spark.sql.catalyst.types.ArrayType ]]s are handled as follows:<ul>
169206 * <li> If their element is complex, that is of type
170207 * [[org.apache.spark.sql.catalyst.types.StructType ]], they are converted
@@ -174,18 +211,18 @@ private[parquet] object ParquetTypesConverter {
174211 * that is also a list but has only a single field of the type corresponding to
175212 * the element type.</li></ul></li>
176213 * </ul>
177- * Parquet's repetition level is set according to the following rule:
214+ * Parquet's repetition level is generally set according to the following rule:
178215 * <ul>
179- * <li> If the call to `fromDataType` is recursive inside an enclosing `ArrayType`, then
180- * the repetition level is set to `REPEATED`.</li>
216+ * <li> If the call to `fromDataType` is recursive inside an enclosing `ArrayType` or
217+ * `MapType`, then the repetition level is set to `REPEATED`.</li>
181218 * <li> Otherwise, if the attribute whose type is converted is `nullable`, the Parquet
182219 * type gets repetition level `OPTIONAL` and otherwise `REQUIRED`.</li>
183220 * </ul>
184- * The single expection to this rule is an [[org.apache.spark.sql.catalyst.types.ArrayType ]]
221+ * The single exception to this rule is an [[org.apache.spark.sql.catalyst.types.ArrayType ]]
185222 * that contains a [[org.apache.spark.sql.catalyst.types.StructType ]], whose repetition level
186223 * is always set to `REPEATED`.
187224 *
188- @param ctype The type to convert.
225+ * @param ctype The type to convert.
189226 * @param name The name of the [[org.apache.spark.sql.catalyst.expressions.Attribute ]]
190227 * whose type is converted
191228 * @param nullable When true indicates that the attribute is nullable
@@ -239,6 +276,13 @@ private[parquet] object ParquetTypesConverter {
239276 }
240277 new ParquetGroupType (repetition, name, fields)
241278 }
279+ case MapType (keyType, valueType) => {
280+ ConversionPatterns .mapType(
281+ repetition,
282+ name,
283+ fromDataType(keyType, " key" , false , inArray = false ),
284+ fromDataType(valueType, " value" , true , inArray = false ))
285+ }
242286 case _ => sys.error(s " Unsupported datatype $ctype" )
243287 }
244288 }
0 commit comments