diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index ff5c724375c3..c6b1fa63e74f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -154,7 +154,7 @@ private[parquet] class ParquetRowConverter( |${catalystType.prettyJson} """.stripMargin) - private val UTC = DateTimeUtils.TimeZoneUTC + private[this] val UTC = DateTimeUtils.TimeZoneUTC /** * Updater used together with field converters within a [[ParquetRowConverter]]. It propagates @@ -171,7 +171,7 @@ private[parquet] class ParquetRowConverter( override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) } - private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType)) + private[this] val currentRow = new SpecificInternalRow(catalystType.map(_.dataType)) /** * The [[InternalRow]] converted from an entire Parquet record. @@ -179,7 +179,7 @@ private[parquet] class ParquetRowConverter( def currentRecord: InternalRow = currentRow // Converters for each field. - private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { + private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = { parquetType.getFields.asScala.map { parquetField => val fieldIndex = catalystType.fieldIndex(parquetField.getName) val catalystField = catalystType(fieldIndex) @@ -188,12 +188,15 @@ private[parquet] class ParquetRowConverter( }.toArray } + // Updaters for each field. + private[this] val fieldUpdaters: Array[ParentContainerUpdater] = fieldConverters.map(_.updater) + override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) override def end(): Unit = { var i = 0 - while (i < fieldConverters.length) { - fieldConverters(i).updater.end() + while (i < fieldUpdaters.length) { + fieldUpdaters(i).end() i += 1 } updater.set(currentRow) @@ -201,13 +204,14 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 - while (i < currentRow.numFields) { + val numFields = currentRow.numFields + while (i < numFields) { currentRow.setNullAt(i) i += 1 } i = 0 - while (i < fieldConverters.length) { - fieldConverters(i).updater.start() + while (i < fieldUpdaters.length) { + fieldUpdaters(i).start() i += 1 } } @@ -464,9 +468,9 @@ private[parquet] class ParquetRowConverter( updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) { - private var currentArray: ArrayBuffer[Any] = _ + private[this] val currentArray = ArrayBuffer.empty[Any] - private val elementConverter: Converter = { + private[this] val elementConverter: Converter = { val repeatedType = parquetSchema.getType(0) val elementType = catalystSchema.elementType @@ -517,10 +521,7 @@ private[parquet] class ParquetRowConverter( override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray)) - // NOTE: We can't reuse the mutable `ArrayBuffer` here and must instantiate a new buffer for the - // next value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored - // in row cells. - override def start(): Unit = currentArray = ArrayBuffer.empty[Any] + override def start(): Unit = currentArray.clear() /** Array element converter */ private final class ElementConverter(parquetType: Type, catalystType: DataType) @@ -528,9 +529,10 @@ private[parquet] class ParquetRowConverter( private var currentElement: Any = _ - private val converter = newConverter(parquetType, catalystType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentElement = value - }) + private[this] val converter = + newConverter(parquetType, catalystType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentElement = value + }) override def getConverter(fieldIndex: Int): Converter = converter @@ -547,10 +549,10 @@ private[parquet] class ParquetRowConverter( updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) { - private var currentKeys: ArrayBuffer[Any] = _ - private var currentValues: ArrayBuffer[Any] = _ + private[this] val currentKeys = ArrayBuffer.empty[Any] + private[this] val currentValues = ArrayBuffer.empty[Any] - private val keyValueConverter = { + private[this] val keyValueConverter = { val repeatedType = parquetType.getType(0).asGroupType() new KeyValueConverter( repeatedType.getType(0), @@ -568,12 +570,9 @@ private[parquet] class ParquetRowConverter( updater.set(ArrayBasedMapData(currentKeys.toArray, currentValues.toArray)) } - // NOTE: We can't reuse the mutable Map here and must instantiate a new `Map` for the next - // value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored in row - // cells. override def start(): Unit = { - currentKeys = ArrayBuffer.empty[Any] - currentValues = ArrayBuffer.empty[Any] + currentKeys.clear() + currentValues.clear() } /** Parquet converter for key-value pairs within the map. */ @@ -588,7 +587,7 @@ private[parquet] class ParquetRowConverter( private var currentValue: Any = _ - private val converters = Array( + private[this] val converters = Array( // Converter for keys newConverter(parquetKeyType, catalystKeyType, new ParentContainerUpdater { override def set(value: Any): Unit = currentKey = value @@ -614,10 +613,10 @@ private[parquet] class ParquetRowConverter( } private trait RepeatedConverter { - private var currentArray: ArrayBuffer[Any] = _ + private[this] val currentArray = ArrayBuffer.empty[Any] protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater { - override def start(): Unit = currentArray = ArrayBuffer.empty[Any] + override def start(): Unit = currentArray.clear() override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray)) override def set(value: Any): Unit = currentArray += value } @@ -635,7 +634,7 @@ private[parquet] class ParquetRowConverter( val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater) - private val elementConverter: PrimitiveConverter = + private[this] val elementConverter: PrimitiveConverter = newConverter(parquetType, catalystType, updater).asPrimitiveConverter() override def addBoolean(value: Boolean): Unit = elementConverter.addBoolean(value) @@ -662,7 +661,7 @@ private[parquet] class ParquetRowConverter( val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater) - private val elementConverter: GroupConverter = + private[this] val elementConverter: GroupConverter = newConverter(parquetType, catalystType, updater).asGroupConverter() override def getConverter(field: Int): Converter = elementConverter.getConverter(field)