Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,34 @@ private[parquet] class ParquetRowConverter(
new ParquetMapConverter(parquetType.asGroupType(), t, updater)

case t: StructType =>
val wrappedUpdater = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen, no big deal at all but how about we put the JIRA ID somewhere in the comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea: I added a JIRA reference in e6945e8

// SPARK-30338: avoid unnecessary InternalRow copying for nested structs:
// There are two cases to handle here:
//
// 1. Parent container is a map or array: we must make a deep copy of the mutable row
// because this converter may be invoked multiple times per Parquet input record
// (if the map or array contains multiple elements).
//
// 2. Parent container is a struct: we don't need to copy the row here because either:
//
// (a) all ancestors are structs and therefore no copying is required because this
// converter will only be invoked once per Parquet input record, or
// (b) some ancestor is struct that is nested in a map or array and that ancestor's
// converter will perform deep-copying (which will recursively copy this row).
if (updater.isInstanceOf[RowUpdater]) {
// `updater` is a RowUpdater, implying that the parent container is a struct.
updater
} else {
// `updater` is NOT a RowUpdater, implying that the parent container a map or array.
new ParentContainerUpdater {
override def set(value: Any): Unit = {
updater.set(value.asInstanceOf[SpecificInternalRow].copy()) // deep copy
}
}
}
}
new ParquetRowConverter(
schemaConverter, parquetType.asGroupType(), t, convertTz, new ParentContainerUpdater {
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})
schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater)

case t =>
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,42 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

testStandardAndLegacyModes("array of struct") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test for array of struct of struct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new test case for this in 0f1af94

val data = (1 to 4).map { i =>
Tuple1(
Seq(
Tuple1(s"1st_val_$i"),
Tuple1(s"2nd_val_$i")
)
)
}
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(array) =>
Row(array.map(struct => Row(struct.productIterator.toSeq: _*)))
})
}
}

testStandardAndLegacyModes("array of nested struct") {
val data = (1 to 4).map { i =>
Tuple1(
Seq(
Tuple1(
Tuple1(s"1st_val_$i")),
Tuple1(
Tuple1(s"2nd_val_$i"))
)
)
}
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(array) =>
Row(array.map { case Tuple1(Tuple1(str)) => Row(Row(str))})
})
}
}

testStandardAndLegacyModes("nested struct with array of array as field") {
val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
withParquetDataFrame(data) { df =>
Expand All @@ -214,9 +250,34 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

testStandardAndLegacyModes("nested map with struct as key type") {
val data = (1 to 4).map { i =>
Tuple1(
Map(
(i, s"kA_$i") -> s"vA_$i",
(i, s"kB_$i") -> s"vB_$i"
)
)
}
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(m) =>
Row(m.map { case (k, v) => Row(k.productIterator.toSeq: _*) -> v })
})
}
}

testStandardAndLegacyModes("nested map with struct as value type") {
val data = (1 to 4).map(i => Tuple1(Map(i -> ((i, s"val_$i")))))
val data = (1 to 4).map { i =>
Tuple1(
Map(
s"kA_$i" -> ((i, s"vA_$i")),
s"kB_$i" -> ((i, s"vB_$i"))
)
)
}
withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(m) =>
Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
})
Expand Down