Skip to content

Commit 7a1a5db

Browse files
JoshRosenjoshrosen-stripe
authored andcommitted
[SPARK-30414][SQL] ParquetRowConverter optimizations: arrays, maps, plus misc. constant factors
### What changes were proposed in this pull request? This PR implements multiple performance optimizations for `ParquetRowConverter`, achieving some modest constant-factor wins for all fields and larger wins for map and array fields: - Add `private[this]` to several `val`s (90cebf0) - Keep a `fieldUpdaters` array, saving two`.updater()` calls per field (7318785): I suspect that these are often megamorphic calls, so cutting these out seems like it could be a relatively large performance win. - Only call `currentRow.numFields` once per `start()` call (e05de15): previously we'd call it once per field and this had a significant enough cost that it was visible during profiling. - Reuse buffers in array and map converters (c7d1534, 6d16f59): previously we would create a brand-new Scala `ArrayBuffer` for each field read, but this isn't actually necessary because the data is already copied into a fresh array when `end()` constructs a `GenericArrayData`. ### Why are the changes needed? To improve Parquet read performance; this is complementary to #26993's (orthogonal) improvements for nested struct read performance. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing tests, plus manual benchmarking with both synthetic and realistic schemas (similar to the ones in #26993). I've seen ~10%+ improvements in scan performance on certain real-world datasets. Closes #27089 from JoshRosen/joshrosen/more-ParquetRowConverter-optimizations. Lead-authored-by: Josh Rosen <[email protected]> Co-authored-by: Josh Rosen <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 93d3ab8 commit 7a1a5db

File tree

1 file changed

+29
-30
lines changed

1 file changed

+29
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private[parquet] class ParquetRowConverter(
154154
|${catalystType.prettyJson}
155155
""".stripMargin)
156156

157-
private val UTC = DateTimeUtils.TimeZoneUTC
157+
private[this] val UTC = DateTimeUtils.TimeZoneUTC
158158

159159
/**
160160
* Updater used together with field converters within a [[ParquetRowConverter]]. It propagates
@@ -171,15 +171,15 @@ private[parquet] class ParquetRowConverter(
171171
override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
172172
}
173173

174-
private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))
174+
private[this] val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))
175175

176176
/**
177177
* The [[InternalRow]] converted from an entire Parquet record.
178178
*/
179179
def currentRecord: InternalRow = currentRow
180180

181181
// Converters for each field.
182-
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
182+
private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
183183
parquetType.getFields.asScala.map { parquetField =>
184184
val fieldIndex = catalystType.fieldIndex(parquetField.getName)
185185
val catalystField = catalystType(fieldIndex)
@@ -188,26 +188,30 @@ private[parquet] class ParquetRowConverter(
188188
}.toArray
189189
}
190190

191+
// Updaters for each field.
192+
private[this] val fieldUpdaters: Array[ParentContainerUpdater] = fieldConverters.map(_.updater)
193+
191194
override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)
192195

193196
override def end(): Unit = {
194197
var i = 0
195-
while (i < fieldConverters.length) {
196-
fieldConverters(i).updater.end()
198+
while (i < fieldUpdaters.length) {
199+
fieldUpdaters(i).end()
197200
i += 1
198201
}
199202
updater.set(currentRow)
200203
}
201204

202205
override def start(): Unit = {
203206
var i = 0
204-
while (i < currentRow.numFields) {
207+
val numFields = currentRow.numFields
208+
while (i < numFields) {
205209
currentRow.setNullAt(i)
206210
i += 1
207211
}
208212
i = 0
209-
while (i < fieldConverters.length) {
210-
fieldConverters(i).updater.start()
213+
while (i < fieldUpdaters.length) {
214+
fieldUpdaters(i).start()
211215
i += 1
212216
}
213217
}
@@ -488,9 +492,9 @@ private[parquet] class ParquetRowConverter(
488492
updater: ParentContainerUpdater)
489493
extends ParquetGroupConverter(updater) {
490494

491-
private var currentArray: ArrayBuffer[Any] = _
495+
private[this] val currentArray = ArrayBuffer.empty[Any]
492496

493-
private val elementConverter: Converter = {
497+
private[this] val elementConverter: Converter = {
494498
val repeatedType = parquetSchema.getType(0)
495499
val elementType = catalystSchema.elementType
496500

@@ -541,20 +545,18 @@ private[parquet] class ParquetRowConverter(
541545

542546
override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray))
543547

544-
// NOTE: We can't reuse the mutable `ArrayBuffer` here and must instantiate a new buffer for the
545-
// next value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored
546-
// in row cells.
547-
override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
548+
override def start(): Unit = currentArray.clear()
548549

549550
/** Array element converter */
550551
private final class ElementConverter(parquetType: Type, catalystType: DataType)
551552
extends GroupConverter {
552553

553554
private var currentElement: Any = _
554555

555-
private val converter = newConverter(parquetType, catalystType, new ParentContainerUpdater {
556-
override def set(value: Any): Unit = currentElement = value
557-
})
556+
private[this] val converter =
557+
newConverter(parquetType, catalystType, new ParentContainerUpdater {
558+
override def set(value: Any): Unit = currentElement = value
559+
})
558560

559561
override def getConverter(fieldIndex: Int): Converter = converter
560562

@@ -571,10 +573,10 @@ private[parquet] class ParquetRowConverter(
571573
updater: ParentContainerUpdater)
572574
extends ParquetGroupConverter(updater) {
573575

574-
private var currentKeys: ArrayBuffer[Any] = _
575-
private var currentValues: ArrayBuffer[Any] = _
576+
private[this] val currentKeys = ArrayBuffer.empty[Any]
577+
private[this] val currentValues = ArrayBuffer.empty[Any]
576578

577-
private val keyValueConverter = {
579+
private[this] val keyValueConverter = {
578580
val repeatedType = parquetType.getType(0).asGroupType()
579581
new KeyValueConverter(
580582
repeatedType.getType(0),
@@ -592,12 +594,9 @@ private[parquet] class ParquetRowConverter(
592594
updater.set(ArrayBasedMapData(currentKeys.toArray, currentValues.toArray))
593595
}
594596

595-
// NOTE: We can't reuse the mutable Map here and must instantiate a new `Map` for the next
596-
// value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored in row
597-
// cells.
598597
override def start(): Unit = {
599-
currentKeys = ArrayBuffer.empty[Any]
600-
currentValues = ArrayBuffer.empty[Any]
598+
currentKeys.clear()
599+
currentValues.clear()
601600
}
602601

603602
/** Parquet converter for key-value pairs within the map. */
@@ -612,7 +611,7 @@ private[parquet] class ParquetRowConverter(
612611

613612
private var currentValue: Any = _
614613

615-
private val converters = Array(
614+
private[this] val converters = Array(
616615
// Converter for keys
617616
newConverter(parquetKeyType, catalystKeyType, new ParentContainerUpdater {
618617
override def set(value: Any): Unit = currentKey = value
@@ -638,10 +637,10 @@ private[parquet] class ParquetRowConverter(
638637
}
639638

640639
private trait RepeatedConverter {
641-
private var currentArray: ArrayBuffer[Any] = _
640+
private[this] val currentArray = ArrayBuffer.empty[Any]
642641

643642
protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater {
644-
override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
643+
override def start(): Unit = currentArray.clear()
645644
override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray))
646645
override def set(value: Any): Unit = currentArray += value
647646
}
@@ -659,7 +658,7 @@ private[parquet] class ParquetRowConverter(
659658

660659
val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
661660

662-
private val elementConverter: PrimitiveConverter =
661+
private[this] val elementConverter: PrimitiveConverter =
663662
newConverter(parquetType, catalystType, updater).asPrimitiveConverter()
664663

665664
override def addBoolean(value: Boolean): Unit = elementConverter.addBoolean(value)
@@ -686,7 +685,7 @@ private[parquet] class ParquetRowConverter(
686685

687686
val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
688687

689-
private val elementConverter: GroupConverter =
688+
private[this] val elementConverter: GroupConverter =
690689
newConverter(parquetType, catalystType, updater).asGroupConverter()
691690

692691
override def getConverter(field: Int): Converter = elementConverter.getConverter(field)

0 commit comments

Comments
 (0)