Skip to content

Commit bf03fe6

Browse files
committed
[SPARK-10136] [SQL] A more robust fix for SPARK-10136
PR #8341 is a valid fix for SPARK-10136, but it didn't catch the real root cause. The real problem can be rather tricky to explain, and requires audiences to be pretty familiar with parquet-format spec, especially details of `LIST` backwards-compatibility rules. Let me have a try to give an explanation here. The structure of the problematic Parquet schema generated by parquet-avro is something like this: ``` message m { <repetition> group f (LIST) { // Level 1 repeated group array (LIST) { // Level 2 repeated <primitive-type> array; // Level 3 } } } ``` (The schema generated by parquet-thrift is structurally similar, just replace the `array` at level 2 with `f_tuple`, and the other one at level 3 with `f_tuple_tuple`.) This structure consists of two nested legacy 2-level `LIST`-like structures: 1. The repeated group type at level 2 is the element type of the outer array defined at level 1 This group should map to an `CatalystArrayConverter.ElementConverter` when building converters. 2. The repeated primitive type at level 3 is the element type of the inner array defined at level 2 This group should also map to an `CatalystArrayConverter.ElementConverter`. The root cause of SPARK-10136 is that, the group at level 2 isn't properly recognized as the element type of level 1. Thus, according to parquet-format spec, the repeated primitive at level 3 is left as a so called "unannotated repeated primitive type", and is recognized as a required list of required primitive type, thus a `RepeatedPrimitiveConverter` instead of a `CatalystArrayConverter.ElementConverter` is created for it. According to parquet-format spec, unannotated repeated type shouldn't appear in a `LIST`- or `MAP`-annotated group. PR #8341 fixed this issue by allowing such unannotated repeated type appear in `LIST`-annotated groups, which is a non-standard, hacky, but valid fix. (I didn't realize this when authoring #8341 though.) As for the reason why level 2 isn't recognized as a list element type, it's because of the following `LIST` backwards-compatibility rule defined in the parquet-format spec: > If the repeated field is a group with one field and is named either `array` or uses the `LIST`-annotated group's name with `_tuple` appended then the repeated type is the element type and elements are required. (The `array` part is for parquet-avro compatibility, while the `_tuple` part is for parquet-thrift.) This rule is implemented in [`CatalystSchemaConverter.isElementType`] [1], but neglected in [`CatalystRowConverter.isElementType`] [2]. This PR delivers a more robust fix by adding this rule in the latter method. Note that parquet-avro 1.7.0 also suffers from this issue. Details can be found at [PARQUET-364] [3]. [1]: https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala#L259-L305 [2]: https://github.com/apache/spark/blob/85f9a61357994da5023b08b0a8a2eb09388ce7f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala#L456-L463 [3]: https://issues.apache.org/jira/browse/PARQUET-364 Author: Cheng Lian <[email protected]> Closes #8361 from liancheng/spark-10136/proper-version.
1 parent df7041d commit bf03fe6

File tree

1 file changed

+8
-10
lines changed

1 file changed

+8
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -415,8 +415,9 @@ private[parquet] class CatalystRowConverter(
415415
private val elementConverter: Converter = {
416416
val repeatedType = parquetSchema.getType(0)
417417
val elementType = catalystSchema.elementType
418+
val parentName = parquetSchema.getName
418419

419-
if (isElementType(repeatedType, elementType)) {
420+
if (isElementType(repeatedType, elementType, parentName)) {
420421
newConverter(repeatedType, elementType, new ParentContainerUpdater {
421422
override def set(value: Any): Unit = currentArray += value
422423
})
@@ -453,10 +454,13 @@ private[parquet] class CatalystRowConverter(
453454
* @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
454455
*/
455456
// scalastyle:on
456-
private def isElementType(parquetRepeatedType: Type, catalystElementType: DataType): Boolean = {
457+
private def isElementType(
458+
parquetRepeatedType: Type, catalystElementType: DataType, parentName: String): Boolean = {
457459
(parquetRepeatedType, catalystElementType) match {
458460
case (t: PrimitiveType, _) => true
459461
case (t: GroupType, _) if t.getFieldCount > 1 => true
462+
case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => true
463+
case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parentName + "_tuple" => true
460464
case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true
461465
case _ => false
462466
}
@@ -474,15 +478,9 @@ private[parquet] class CatalystRowConverter(
474478

475479
override def getConverter(fieldIndex: Int): Converter = converter
476480

477-
override def end(): Unit = {
478-
converter.updater.end()
479-
currentArray += currentElement
480-
}
481+
override def end(): Unit = currentArray += currentElement
481482

482-
override def start(): Unit = {
483-
converter.updater.start()
484-
currentElement = null
485-
}
483+
override def start(): Unit = currentElement = null
486484
}
487485
}
488486

0 commit comments

Comments
 (0)