Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[parquet] class ParquetRowConverter(
|${catalystType.prettyJson}
""".stripMargin)

private val UTC = DateTimeUtils.TimeZoneUTC
private[this] val UTC = DateTimeUtils.TimeZoneUTC

/**
* Updater used together with field converters within a [[ParquetRowConverter]]. It propagates
Expand All @@ -171,15 +171,15 @@ private[parquet] class ParquetRowConverter(
override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
}

private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))
private[this] val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))

/**
* The [[InternalRow]] converted from an entire Parquet record.
*/
def currentRecord: InternalRow = currentRow

// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
parquetType.getFields.asScala.map { parquetField =>
val fieldIndex = catalystType.fieldIndex(parquetField.getName)
val catalystField = catalystType(fieldIndex)
Expand All @@ -188,26 +188,30 @@ private[parquet] class ParquetRowConverter(
}.toArray
}

// Updaters for each field.
private[this] val fieldUpdaters: Array[ParentContainerUpdater] = fieldConverters.map(_.updater)

override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)

override def end(): Unit = {
var i = 0
while (i < fieldConverters.length) {
fieldConverters(i).updater.end()
while (i < fieldUpdaters.length) {
fieldUpdaters(i).end()
i += 1
}
updater.set(currentRow)
}

override def start(): Unit = {
var i = 0
while (i < currentRow.numFields) {
val numFields = currentRow.numFields
while (i < numFields) {
currentRow.setNullAt(i)
i += 1
}
i = 0
while (i < fieldConverters.length) {
fieldConverters(i).updater.start()
while (i < fieldUpdaters.length) {
fieldUpdaters(i).start()
i += 1
}
}
Expand Down Expand Up @@ -464,9 +468,9 @@ private[parquet] class ParquetRowConverter(
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) {

private var currentArray: ArrayBuffer[Any] = _
private[this] val currentArray = ArrayBuffer.empty[Any]

private val elementConverter: Converter = {
private[this] val elementConverter: Converter = {
val repeatedType = parquetSchema.getType(0)
val elementType = catalystSchema.elementType

Expand Down Expand Up @@ -517,20 +521,18 @@ private[parquet] class ParquetRowConverter(

override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray))

// NOTE: We can't reuse the mutable `ArrayBuffer` here and must instantiate a new buffer for the
// next value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored
// in row cells.
override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
Comment on lines -520 to -523
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on currentArray.toArray copies the elements or not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayBuffer.toArray should always return a fresh unshared array object (internally, it allocates a new array and then calls copyToArray).

It doesn't do copying / cloning of the array elements themselves, but that shouldn't be a problem: by design, the objects that are inserted into this array are unshared / immutable: the map and array converters always return unshared objects and we always .copy() rows when inserting them into a map or array parent container (this is still true after the changes in #26993).

I did a bit of archaeology and tracked down the source of the // NOTE comment here: it was added in #7231 and at that time it looks like we were actually passing the mutable.ArrayBuffer itself to updater: https://github.com/apache/spark/blame/360fe18a61538b03cac05da1c6d258e124df6feb/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala#L321. The comment makes sense in that context: with that older code, we would wind up with Row() objects that contained mutable.ArrayBuffers.

Later, in #7724 this was changed to pass a new GenericArrayData(currentArray.toArray) to the parent updater: c0cc0ea#diff-1d6c363c04155a9328fe1f5bd08a2f90. At that point I think we could have safely made the change to begin reusing the mutable.ArrayBuffer since it no longer escaped its converter.

override def start(): Unit = currentArray.clear()

/** Array element converter */
private final class ElementConverter(parquetType: Type, catalystType: DataType)
extends GroupConverter {

private var currentElement: Any = _

private val converter = newConverter(parquetType, catalystType, new ParentContainerUpdater {
override def set(value: Any): Unit = currentElement = value
})
private[this] val converter =
newConverter(parquetType, catalystType, new ParentContainerUpdater {
override def set(value: Any): Unit = currentElement = value
})

override def getConverter(fieldIndex: Int): Converter = converter

Expand All @@ -547,10 +549,10 @@ private[parquet] class ParquetRowConverter(
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) {

private var currentKeys: ArrayBuffer[Any] = _
private var currentValues: ArrayBuffer[Any] = _
private[this] val currentKeys = ArrayBuffer.empty[Any]
private[this] val currentValues = ArrayBuffer.empty[Any]

private val keyValueConverter = {
private[this] val keyValueConverter = {
val repeatedType = parquetType.getType(0).asGroupType()
new KeyValueConverter(
repeatedType.getType(0),
Expand All @@ -568,12 +570,9 @@ private[parquet] class ParquetRowConverter(
updater.set(ArrayBasedMapData(currentKeys.toArray, currentValues.toArray))
}

// NOTE: We can't reuse the mutable Map here and must instantiate a new `Map` for the next
// value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored in row
// cells.
override def start(): Unit = {
currentKeys = ArrayBuffer.empty[Any]
currentValues = ArrayBuffer.empty[Any]
currentKeys.clear()
currentValues.clear()
}

/** Parquet converter for key-value pairs within the map. */
Expand All @@ -588,7 +587,7 @@ private[parquet] class ParquetRowConverter(

private var currentValue: Any = _

private val converters = Array(
private[this] val converters = Array(
// Converter for keys
newConverter(parquetKeyType, catalystKeyType, new ParentContainerUpdater {
override def set(value: Any): Unit = currentKey = value
Expand All @@ -614,10 +613,10 @@ private[parquet] class ParquetRowConverter(
}

private trait RepeatedConverter {
private var currentArray: ArrayBuffer[Any] = _
private[this] val currentArray = ArrayBuffer.empty[Any]

protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater {
override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
override def start(): Unit = currentArray.clear()
override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray))
override def set(value: Any): Unit = currentArray += value
}
Expand All @@ -635,7 +634,7 @@ private[parquet] class ParquetRowConverter(

val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)

private val elementConverter: PrimitiveConverter =
private[this] val elementConverter: PrimitiveConverter =
newConverter(parquetType, catalystType, updater).asPrimitiveConverter()

override def addBoolean(value: Boolean): Unit = elementConverter.addBoolean(value)
Expand All @@ -662,7 +661,7 @@ private[parquet] class ParquetRowConverter(

val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)

private val elementConverter: GroupConverter =
private[this] val elementConverter: GroupConverter =
newConverter(parquetType, catalystType, updater).asGroupConverter()

override def getConverter(field: Int): Converter = elementConverter.getConverter(field)
Expand Down